diff --git a/entitydb-api/src/main/java/ai/philterd/entitydb/api/EntityDbRestApiController.java b/entitydb-api/src/main/java/ai/philterd/entitydb/api/EntityDbRestApiController.java index 0828501..2e232b6 100644 --- a/entitydb-api/src/main/java/ai/philterd/entitydb/api/EntityDbRestApiController.java +++ b/entitydb-api/src/main/java/ai/philterd/entitydb/api/EntityDbRestApiController.java @@ -69,24 +69,22 @@ public class EntityDbRestApiController { private static final Logger LOGGER = LogManager.getLogger(EntityDbRestApiController.class); - @Autowired - private EntityQueryService entityQueryService; - - @Autowired - private SearchIndex searchIndex; - - @Autowired - private EntityAclService entityAclService; - - @Autowired - public EntityStore entityStore; - - @Autowired - public UserService userService; - - @Autowired - public EntityQueueService entityQueueService; - + private final EntityQueryService entityQueryService; + private final SearchIndex searchIndex; + private final EntityAclService entityAclService; + public final EntityStore entityStore; + public final UserService userService; + public final EntityQueueService entityQueueService; + + public EntityDbRestApiController(EntityQueryService entityQueryService, SearchIndex searchIndex, EntityAclService entityAclService, EntityStore entityStore, UserService userService, EntityQueueService entityQueueService) { + this.entityQueryService = entityQueryService; + this.searchIndex = searchIndex; + this.entityAclService = entityAclService; + this.entityStore = entityStore; + this.userService = userService; + this.entityQueueService = entityQueueService; + } + /** * Gets the status. * @return The {@link Status status}. @@ -138,7 +136,7 @@ public List notifications( /** * Gets the continuous queries for a user identified by the API key. * @param authorization The user's API key. - * @return A list of {@link ContinuousQueries queries}. + * @return A list of continuous queries. */ @RequestMapping(value = "/api/user/continuousqueries", method = RequestMethod.GET) @ResponseStatus(HttpStatus.OK) @@ -160,14 +158,11 @@ public List continuousQueries( /** * Queues an entities for ingest. - * @param entities A collection of {@link Entities}. + * @param entities A collection of entities. * @param acl An optional ACL for all entities. If not specified all entities * are visible to all users. * @param authorization The user's API key. - * @throws MalformedAclException Thrown if the ACL is invalid. - * @throws Exception Thrown if the entities cannot be queued for ingestion. - * Check the server log for more information. - */ + */ @RequestMapping(value = "/api/entity", method = {RequestMethod.PUT, RequestMethod.POST}) @ResponseStatus(HttpStatus.OK) public void store( @@ -198,8 +193,6 @@ public void store( * @param entityId The ID of the entity. * @param acl The updated ACL for the entity. * @param authorization The user's API key. - * @throws NonexistantEntityException Thrown if the entity having the entity ID does not exist. - * @throws MalformedAclException Thrown if the ACL is invalid. * @throws InternalServerErrorException Thrown if the entity's ACL cannot * be updated for other reasons. Check the server log for more information on the cause. */ @@ -207,7 +200,7 @@ public void store( @ResponseStatus(HttpStatus.OK) public void store( @PathVariable String entityId, - @RequestParam(value="acl", required=true) String acl, + @RequestParam(value="acl") String acl, @RequestHeader(value="Authorization") String authorization) { try { @@ -265,7 +258,7 @@ public ResponseEntity eql( status = HttpStatus.CREATED; } - return new ResponseEntity(queryResult, status); + return new ResponseEntity<>(queryResult, status); } catch (QueryExecutionException ex) { diff --git a/entitydb-app/src/main/java/ai/philterd/entitydb/EntityDbApplication.java b/entitydb-app/src/main/java/ai/philterd/entitydb/EntityDbApplication.java index d5d8d99..45d1e80 100644 --- a/entitydb-app/src/main/java/ai/philterd/entitydb/EntityDbApplication.java +++ b/entitydb-app/src/main/java/ai/philterd/entitydb/EntityDbApplication.java @@ -110,7 +110,7 @@ public static void main(String[] args) { protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(EntityDbApplication.class); } - + @Bean(destroyMethod = "shutdown") public ThreadPoolExecutor getThreadPoolExecutor() { @@ -122,298 +122,298 @@ public ThreadPoolExecutor getThreadPoolExecutor() { return executor; } - + @Bean public Indexer getIndexer() { - + return new ElasticSearchIndexer(getSearchIndex(), getEntityStore(), getIndexerCache(), properties.getIndexerBatchSize()); - + } - + @Bean public AuditLogger getAuditLogger() { - + AuditLogger auditLogger = null; - + if(properties.isAuditEnabled()) { - + final String auditLoggerType = properties.getAuditLogger(); - + try { - + if("tempfile".equalsIgnoreCase(auditLoggerType)) { - + auditLogger = new FileAuditLogger(getSystemId()); - + } else if("fluentd".equalsIgnoreCase(auditLoggerType)) { - + auditLogger = new FluentdAuditLogger(getSystemId()); - + } else { - + LOGGER.warn("Invalid value for audit logger."); auditLogger = new FileAuditLogger(getSystemId()); - + } - + } catch (IOException ex) { - + LOGGER.error("Unable to initialize audit logger.", ex); - + } - + } else { - + LOGGER.info("Auditing is disabled. Audit events will be directed to a temporary file and discarded."); auditLogger = new FluentdAuditLogger(getSystemId()); - + } - + return auditLogger; - + } - + @Bean public EntityStore getEntityStore() { - + EntityStore entityStore = null; - + final String entitydb = properties.getDatabase(); - + LOGGER.info("Using database: {}", entitydb); - + try { - + if(StringUtils.equalsIgnoreCase(EntityDbProperties.MYSQL, entitydb)) { - + entityStore = RdbmsEntityStore.createMySQL5EntityStore(properties.getMySqlJdbcURL(), properties.getMySqlUsername(), properties.getMySqlPassword(), "validate"); - + } else if(StringUtils.equalsIgnoreCase(EntityDbProperties.DYNAMODB, entitydb)) { - + if(StringUtils.isNotEmpty(properties.getDynamoDBAccessKey())) { - - entityStore = new DynamoDBEntityStore(properties.getDynamoDBAccessKey(), properties.getDynamoDBSecretKey(), properties.getDynamoDBEndpoint(), properties.getDynamoDBTable()); - + + entityStore = new DynamoDBEntityStore(properties.getDynamoDBAccessKey(), properties.getDynamoDBSecretKey(), properties.getDynamoDBEndpoint(), properties.getDynamoDBTable()); + } else { - + entityStore = new DynamoDBEntityStore(properties.getDynamoDBEndpoint(), properties.getDynamoDBTable()); - + } - + } else if(StringUtils.equalsIgnoreCase(EntityDbProperties.MONGODB, entitydb)) { - + final String mongoDbHost = properties.getMongoDBHost(); final int mongoDbPort = properties.getMongoDBPort(); final String mongoDbDatabase = properties.getMongoDBDatabase(); final String mongoDbCollection = properties.getMongoDBCollection(); final String mongoDbUsername = properties.getMongoDBUsername(); final String mongoDbPassword = properties.getMongoDBPassword(); - + if(StringUtils.isEmpty(mongoDbUsername)) { - + entityStore = new MongoDBEntityStore(mongoDbHost, mongoDbPort, mongoDbDatabase, mongoDbCollection); - + } else { - + entityStore = new MongoDBEntityStore(mongoDbHost, mongoDbPort, mongoDbUsername, mongoDbPassword, mongoDbDatabase, mongoDbCollection); - + } - + } else if(StringUtils.equalsIgnoreCase(EntityDbProperties.INTERNAL, entitydb)) { - + LOGGER.warn("A temporary, internal entity store will be used. Its contents are not retained across restarts."); - + entityStore = RdbmsEntityStore.createHypersonicEntityStore("jdbc:hsqldb:mem:entitydb-entity-store", "sa", "", "create-drop"); - + } else { - + LOGGER.warn("Invalid database selection: {}", entitydb); LOGGER.warn("A temporary, internal entity store will be used. Its contents are not retained across restarts."); - + // Use an internal database. entityStore = RdbmsEntityStore.createHypersonicEntityStore("jdbc:hsqldb:mem:entitydb", "sa", "", "create-drop"); - + } - + } catch (Exception ex) { - + LOGGER.error("Unable to connect to entity store. Please check your credentials and connection information.", ex); - + } - + return entityStore; - + } - + @Bean public SearchIndex getSearchIndex() { - + SearchIndex searchIndex = null; - + try { - + if(StringUtils.equalsIgnoreCase(EntityDbProperties.INTERNAL, properties.getSearchIndexProvider())) { - + final EmbeddedElasticsearchServer embeddedElasticsearchServer = new EmbeddedElasticsearchServer(); embeddedElasticsearchServer.start(); - + LOGGER.warn("Using the internal search index is not recommended for production systems."); - + searchIndex = new ElasticSearchIndex("http://localhost:9200/"); - + } else if(StringUtils.equalsIgnoreCase(EntityDbProperties.ELASTICSEARCH, properties.getSearchIndexProvider())) { - + if(StringUtils.isEmpty(properties.getElasticsearchUsername())) { - + searchIndex = new ElasticSearchIndex(properties.getElasticsearchHost()); - + } else { - + searchIndex = new ElasticSearchIndex(properties.getElasticsearchHost(), properties.getElasticsearchUsername(), properties.getElasticsearchPassword()); - + } - + } else { - + LOGGER.warn("Invalid search index: {}", properties.getSearchIndexProvider()); LOGGER.warn("Using the internal search index is not recommended for production systems."); - + } - + } catch (IOException ex) { - + LOGGER.error("Unable to configure Elasticsearch client.", ex); - + } - + return searchIndex; - + } - + @Bean public List getRulesEngines() { - + List rulesEngines = new LinkedList(); - + if(properties.isRulesEngineEnabled()) { - + final String rulesDirectory = properties.getRulesDirectory(); - + try { - - rulesEngines.add(new DroolsRulesEngine(rulesDirectory)); + + rulesEngines.add(new DroolsRulesEngine(rulesDirectory)); rulesEngines.add(new XmlRulesEngine(rulesDirectory)); - + } catch (RulesEngineException ex) { - + LOGGER.error("Unable to initialize the rules engine.", ex); - + } - + } else { - + LOGGER.info("The rules engine is disabled."); - + } - + return rulesEngines; - + } - + @Bean public QueuePublisher getQueuePublisher() { - + QueuePublisher queuePublisher = null; - + String queue = properties.getQueueProvider(); - + if(StringUtils.equalsIgnoreCase(EntityDbProperties.SQS, queue)) { - + LOGGER.info("Using SQS queue: {}", properties.getSqsQueueUrl()); - + if(StringUtils.isNotEmpty(properties.getSqsAccessKey())) { - + queuePublisher = new SqsQueuePublisher(properties.getSqsQueueUrl(), properties.getSqsEndpoint(), properties.getSqsAccessKey(), properties.getSqsSecretKey(), getMetricReporter()); - + } else { - + queuePublisher = new SqsQueuePublisher(properties.getSqsQueueUrl(), properties.getSqsEndpoint(), getMetricReporter()); - + } - + } else if(StringUtils.equalsIgnoreCase(EntityDbProperties.ACTIVEMQ, queue)) { - + LOGGER.info("Using ActiveMQ queue."); - + try { - + queuePublisher = new ActiveMQQueuePublisher(properties.getActiveMQBrokerUrl(), properties.getActiveMQQueueName(), getMetricReporter()); - + } catch (Exception ex) { - + LOGGER.error("Unable to initialize ActiveMQ queue publisher."); - + } - + } else if(StringUtils.equalsIgnoreCase(EntityDbProperties.INTERNAL, queue)) { - + LOGGER.info("Using internal queue."); - + queuePublisher = new InternalQueuePublisher(getMetricReporter()); - + } else { - + LOGGER.warn("Invalid queue {}. Using internal queue.", queue); - + queuePublisher = new InternalQueuePublisher(getMetricReporter()); - + } - + return queuePublisher; - + } - + @Bean public QueueConsumer getQueueConsumer() { QueueConsumer queueConsumer = null; final String queue = properties.getQueueProvider(); - + if(StringUtils.equalsIgnoreCase(EntityDbProperties.SQS, queue)) { - + LOGGER.info("Using SQS queue {}.", properties.getSqsQueueUrl()); - + if(StringUtils.isNotEmpty(properties.getSqsAccessKey())) { - + queueConsumer = new SqsQueueConsumer(getEntityStore(), getRulesEngines(), getAuditLogger(), getMetricReporter(), properties.getSqsEndpoint(), properties.getSqsQueueUrl(), properties.getSqsAccessKey(), properties.getSqsSecretKey(), properties.getSqsVisibilityTimeout(), getIndexerCache()); - + } else { - + queueConsumer = new SqsQueueConsumer(getEntityStore(), getRulesEngines(), getAuditLogger(), getMetricReporter(), properties.getSqsEndpoint(), properties.getSqsQueueUrl(), properties.getSqsVisibilityTimeout(), getIndexerCache()); - + } - + } else if(StringUtils.equalsIgnoreCase(EntityDbProperties.ACTIVEMQ, queue)) { - + LOGGER.info("Using ActiveMQ queue."); - + try { - + queueConsumer = new ActiveMQQueueConsumer(getEntityStore(), getRulesEngines(), getAuditLogger(), getMetricReporter(), properties.getActiveMQBrokerUrl(), properties.getActiveMQQueueName(), properties.getActiveMQBrokerTimeout(), getIndexerCache()); - + } catch (Exception ex) { - + LOGGER.error("Unable to initialize ActiveMQ queue consumer."); - + } - + } else if(StringUtils.equalsIgnoreCase(EntityDbProperties.INTERNAL, queue)) { - + LOGGER.info("Using internal queue."); - + queueConsumer = new InternalQueueConsumer(getEntityStore(), getRulesEngines(), getAuditLogger(), getMetricReporter(), getIndexerCache()); - + } else { LOGGER.warn("Invalid queue {}. Using the internal queue.", queue); @@ -421,41 +421,41 @@ public QueueConsumer getQueueConsumer() { queueConsumer = new InternalQueueConsumer(getEntityStore(), getRulesEngines(), getAuditLogger(), getMetricReporter(), getIndexerCache()); } - - return queueConsumer; - + + return queueConsumer; + } - + @Bean public MetricReporter getMetricReporter() { - + if(StringUtils.equalsIgnoreCase(EntityDbProperties.INFLUXDB, properties.getMetricsProvider())) { - + LOGGER.info("Using InfluxDB at {} and database {}.", properties.getInfluxDbDatabase(), properties.getInfluxDbDatabase()); - + return new InfluxDbMetricReporter(properties.getInfluxDbEndpoint(), properties.getInfluxDbDatabase(), properties.getInfluxDbUsername(), properties.getInfluxDbPassword()); - + } else if(StringUtils.equalsIgnoreCase(EntityDbProperties.CLOUDWATCH, properties.getMetricsProvider())) { - + if(StringUtils.isNotEmpty(properties.getCloudWatchAccessKey())) { - - return new CloudWatchMetricReporter(getSystemId(), properties.getCloudWatchNamespace(), + + return new CloudWatchMetricReporter(getSystemId(), properties.getCloudWatchNamespace(), properties.getCloudWatchAccessKey(), properties.getCloudWatchSecretKey(), properties.getCloudWatchEndpoint()); - + } else { - + return new CloudWatchMetricReporter(getSystemId(), properties.getCloudWatchNamespace(), properties.getCloudWatchEndpoint()); - + } - + } else { - + return new DefaultMetricReporter(); } - + } - + @Bean public HttpMessageConverters customConverters() { @@ -472,25 +472,25 @@ public HttpMessageConverters customConverters() { public ConcurrentLinkedQueue getIndexerCache() { return new ConcurrentLinkedQueue<>(); - + } @Bean public CacheManager cacheManager() { - + LOGGER.info("Creating cache manager."); final List cacheNames = new LinkedList<>(); - + cacheNames.add("nonExpiredContinuousQueries"); cacheNames.add("continuousQueriesByUser"); cacheNames.add("indexer"); cacheNames.add("general"); - + CacheManager cacheManager = null; - + if(StringUtils.equalsIgnoreCase(properties.getCache(), "memcached")) { - + try { final MemcachedClient memcachedClient = new MemcachedClient( @@ -498,58 +498,58 @@ public CacheManager cacheManager() { .setProtocol(ConnectionFactoryBuilder.Protocol.BINARY) .build(), AddrUtil.getAddresses(properties.getMemcachedHosts())); - + LOGGER.info("Created Memcached client for {}.", properties.getMemcachedHosts()); - + final Collection caches = new ArrayList(); - + for(final String cacheName : cacheNames) { caches.add(new MemcachedCache(memcachedClient, cacheName, properties.getCacheTtl())); } - + return new MemcachedCacheManager(caches); - + } catch (IOException ex) { - + LOGGER.error("Unable to create memcached client.", ex); - + } - + } else { - + LOGGER.info("Using internal cache."); - + final SimpleCacheManager simpleCacheManager = new SimpleCacheManager(); final List caches = new LinkedList<>(); - + for(String cacheName : cacheNames) { caches.add(new ConcurrentMapCache(cacheName)); } - + simpleCacheManager.setCaches(caches); simpleCacheManager.afterPropertiesSet(); - + return cacheManager; } - + return null; - + } - + private String getSystemId() { - + String systemId = properties.getSystemId(); - + if(StringUtils.isEmpty(systemId)) { - + systemId = MetricUtils.getSystemId(); - + } - + return systemId; - + } } \ No newline at end of file diff --git a/entitydb-app/src/main/resources/log4j2.properties b/entitydb-app/src/main/resources/log4j2.properties index ed4a1bb..682e4e7 100644 --- a/entitydb-app/src/main/resources/log4j2.properties +++ b/entitydb-app/src/main/resources/log4j2.properties @@ -1,7 +1,5 @@ -# The root logger with appender name rootLogger = DEBUG, STDOUT -# Assign STDOUT a valid appender & define its layout appender.console.name = STDOUT appender.console.type = Console appender.console.layout.type = PatternLayout