Skip to content

Commit

Permalink
Merge branch 'master' into feat/Add-telemetry-to-events-actions
Browse files Browse the repository at this point in the history
  • Loading branch information
maxiadlovskii authored Jan 10, 2025
2 parents 6b639fc + 4ad2b56 commit b8bdbf5
Show file tree
Hide file tree
Showing 16 changed files with 118 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,100 +17,81 @@
package org.graylog.plugins.pipelineprocessor.db.mongodb;

import com.google.common.collect.ImmutableSet;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import jakarta.inject.Inject;
import org.graylog.plugins.pipelineprocessor.db.PipelineStreamConnectionsService;
import org.graylog.plugins.pipelineprocessor.events.PipelineConnectionsChangedEvent;
import org.graylog.plugins.pipelineprocessor.rest.PipelineConnections;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
import org.graylog2.database.MongoCollections;
import org.graylog2.database.NotFoundException;
import org.graylog2.database.utils.MongoUtils;
import org.graylog2.events.ClusterEventBus;
import org.mongojack.DBCursor;
import org.mongojack.DBQuery;
import org.mongojack.DBSort;
import org.mongojack.JacksonDBCollection;
import org.mongojack.WriteResult;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static com.mongodb.client.model.Filters.eq;
import static com.mongodb.client.model.Filters.in;
import static org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter.getRateLimitedLog;

public class MongoDbPipelineStreamConnectionsService implements PipelineStreamConnectionsService {
private static final RateLimitedLog log = getRateLimitedLog(MongoDbPipelineStreamConnectionsService.class);

private static final String COLLECTION = "pipeline_processor_pipelines_streams";

private final JacksonDBCollection<PipelineConnections, String> dbCollection;
private final ClusterEventBus clusterBus;
private final MongoCollection<PipelineConnections> collection;
private final MongoUtils<PipelineConnections> mongoUtils;

@Inject
public MongoDbPipelineStreamConnectionsService(MongoConnection mongoConnection,
MongoJackObjectMapperProvider mapper,
ClusterEventBus clusterBus) {
this.dbCollection = JacksonDBCollection.wrap(
mongoConnection.getDatabase().getCollection(COLLECTION),
PipelineConnections.class,
String.class,
mapper.get());
public MongoDbPipelineStreamConnectionsService(MongoCollections mongoCollections, ClusterEventBus clusterBus) {
this.clusterBus = clusterBus;
dbCollection.createIndex(DBSort.asc("stream_id"), new BasicDBObject("unique", true));
this.collection = mongoCollections.collection(COLLECTION, PipelineConnections.class);
this.mongoUtils = mongoCollections.utils(collection);

collection.createIndex(Indexes.ascending("stream_id"), new IndexOptions().unique(true));
}

@Override
public PipelineConnections save(PipelineConnections connections) {
PipelineConnections existingConnections = dbCollection.findOne(DBQuery.is("stream_id", connections.streamId()));
PipelineConnections existingConnections = collection.find(eq("stream_id", connections.streamId()))
.first();
if (existingConnections == null) {
existingConnections = PipelineConnections.create(null, connections.streamId(), Collections.emptySet());
}

final PipelineConnections toSave = existingConnections.toBuilder()
.pipelineIds(connections.pipelineIds()).build();
final WriteResult<PipelineConnections, String> save = dbCollection.save(toSave);

final PipelineConnections savedConnections = save.getSavedObject();
final PipelineConnections savedConnections = mongoUtils.save(toSave);
clusterBus.post(PipelineConnectionsChangedEvent.create(savedConnections.streamId(), savedConnections.pipelineIds()));

return savedConnections;
}

@Override
public PipelineConnections load(String streamId) throws NotFoundException {
final PipelineConnections oneById = dbCollection.findOne(DBQuery.is("stream_id", streamId));
final PipelineConnections oneById = collection.find(eq("stream_id", streamId)).first();
if (oneById == null) {
throw new NotFoundException("No pipeline connections with for stream " + streamId);
throw new NotFoundException("No pipeline connections for stream " + streamId);
}
return oneById;
}

@Override
public Set<PipelineConnections> loadAll() {
try (DBCursor<PipelineConnections> connections = dbCollection.find()) {
return ImmutableSet.copyOf((Iterable<PipelineConnections>) connections);
} catch (MongoException e) {
log.error("Unable to load pipeline connections", e);
return Collections.emptySet();
}
return ImmutableSet.copyOf(collection.find());
}

@Override
public Set<PipelineConnections> loadByPipelineId(String pipelineId) {
// Thanks, MongoJack!
// https://github.com/mongojack/mongojack/issues/12
final DBObject query = new BasicDBObject("pipeline_ids", new BasicDBObject("$in", Collections.singleton(pipelineId)));
try (DBCursor<PipelineConnections> pipelineConnections = dbCollection.find(query)) {
return ImmutableSet.copyOf((Iterable<PipelineConnections>) pipelineConnections);
} catch (MongoException e) {
log.error("Unable to load pipeline connections for pipeline ID " + pipelineId, e);
return Collections.emptySet();
}
return ImmutableSet.copyOf(collection.find(in("pipeline_ids", pipelineId)));
}

@Override
Expand All @@ -119,16 +100,17 @@ public void delete(String streamId) {
final PipelineConnections connections = load(streamId);
final Set<String> pipelineIds = connections.pipelineIds();

dbCollection.removeById(connections.id());
mongoUtils.deleteById(connections.id());
clusterBus.post(PipelineConnectionsChangedEvent.create(streamId, pipelineIds));
} catch (NotFoundException e) {
log.debug("No connections found for stream " + streamId);
log.debug("No connections found for stream {}", streamId);
}
}

@Override
public Map<String, PipelineConnections> loadByStreamIds(Collection<String> streamIds) {
return dbCollection.find(DBQuery.in("stream_id", streamIds)).toArray().stream()
.collect(Collectors.toMap(PipelineConnections::streamId, conn -> conn));
try (final var stream = MongoUtils.stream(collection.find(in("stream_id", streamIds)))) {
return stream.collect(Collectors.toMap(PipelineConnections::streamId, conn -> conn));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import org.graylog2.database.BuildableMongoEntity;
import org.graylog2.database.MongoEntity;
import org.mongojack.Id;
import org.mongojack.ObjectId;

Expand All @@ -28,7 +30,8 @@

@AutoValue
@JsonAutoDetect
public abstract class PipelineConnections {
public abstract class PipelineConnections implements MongoEntity,
BuildableMongoEntity<PipelineConnections, PipelineConnections.Builder> {

@JsonProperty("id")
@Nullable
Expand Down Expand Up @@ -60,7 +63,7 @@ public static Builder builder() {
public abstract Builder toBuilder();

@AutoValue.Builder
public abstract static class Builder {
public abstract static class Builder implements BuildableMongoEntity.Builder<PipelineConnections, Builder> {
public abstract PipelineConnections build();

public abstract Builder id(String id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,92 +16,63 @@
*/
package org.graylog.plugins.pipelineprocessor.rulebuilder.db;

import com.google.common.collect.ImmutableSet;
import com.mongodb.BasicDBObject;
import com.mongodb.MongoException;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import org.bson.types.ObjectId;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
import org.mongojack.DBCursor;
import org.mongojack.DBQuery;
import org.mongojack.DBSort;
import org.mongojack.JacksonDBCollection;

import com.google.common.collect.ImmutableList;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.model.Sorts;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.graylog2.database.MongoCollections;
import org.graylog2.database.utils.MongoUtils;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;

import static org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter.getRateLimitedLog;
import static com.mongodb.client.model.Filters.eq;

@Singleton
public class MongoDBRuleFragmentService implements RuleFragmentService {

private static final RateLimitedLog log = getRateLimitedLog(MongoDBRuleFragmentService.class);

public static final String COLLECTION_NAME = "rule_fragments";

private final JacksonDBCollection<RuleFragment, ObjectId> dbCollection;
private final MongoCollection<RuleFragment> collection;
private final MongoUtils<RuleFragment> mongoUtils;

@Inject
public MongoDBRuleFragmentService(
final MongoJackObjectMapperProvider objectMapperProvider,
final MongoConnection mongoConnection
) {
this(JacksonDBCollection.wrap(
mongoConnection.getDatabase().getCollection(COLLECTION_NAME),
RuleFragment.class,
ObjectId.class,
objectMapperProvider.get())
);
}


public MongoDBRuleFragmentService(JacksonDBCollection<RuleFragment, ObjectId> dbCollection) {
this.dbCollection = Objects.requireNonNull(dbCollection);

this.dbCollection.createIndex(new BasicDBObject("name", 1), new BasicDBObject("unique", true));
public MongoDBRuleFragmentService(MongoCollections mongoCollections) {
collection = mongoCollections.collection(COLLECTION_NAME, RuleFragment.class);
mongoUtils = mongoCollections.utils(collection);
collection.createIndex(Indexes.ascending("name"), new IndexOptions().unique(true));
}

@Override
public RuleFragment save(RuleFragment ruleFragment) {
return dbCollection.save(ruleFragment).getSavedObject();
return mongoUtils.save(ruleFragment);
}

@Override
public void delete(String name) {
dbCollection.remove(DBQuery.is("name", name));
collection.deleteOne(eq("name", name));
}

@Override
public void deleteAll() {
dbCollection.remove(DBQuery.empty());
collection.deleteMany(Filters.empty());
}


@Override
public long count(String name) {
return dbCollection.getCount(DBQuery.is("name", name));
return collection.countDocuments(eq("name", name));
}

@Override
public Optional<RuleFragment> get(String name) {
return Optional.ofNullable(dbCollection.findOne(DBQuery.is("name", name)));
return Optional.ofNullable(collection.find(eq("name", name)).first());
}

@Override
public Collection<RuleFragment> all() {
try (DBCursor<RuleFragment> ruleDaos = dbCollection.find().sort(DBSort.asc("title"))) {
return ImmutableSet.copyOf((Iterable<RuleFragment>) ruleDaos);
} catch (MongoException e) {
log.error("Unable to load rule fragments", e);
return Collections.emptySet();
}
return ImmutableList.copyOf(collection.find().sort(Sorts.ascending("title")));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.auto.value.AutoValue;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionDescriptor;
import org.graylog2.database.BuildableMongoEntity;
import org.graylog2.database.MongoEntity;
import org.mongojack.Id;
import org.mongojack.ObjectId;

Expand All @@ -30,7 +32,7 @@

@AutoValue
@JsonIgnoreProperties(value = {"name"}, allowGetters = true)
public abstract class RuleFragment {
public abstract class RuleFragment implements MongoEntity, BuildableMongoEntity<RuleFragment, RuleFragment.Builder> {

public static final String FIELD_NAME = "name";
public static final String FIELD_FRAGMENT = "fragment";
Expand Down Expand Up @@ -63,14 +65,12 @@ public String getName() {
@JsonProperty(FIELD_DESCRIPTOR)
public abstract FunctionDescriptor descriptor();


public static Builder builder() {
return new AutoValue_RuleFragment.Builder().isCondition(false);
}


@AutoValue.Builder
public abstract static class Builder {
public abstract static class Builder implements BuildableMongoEntity.Builder<RuleFragment, Builder> {

public abstract Builder id(String id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
*
* @deprecated use {@link org.graylog2.database.MongoCollections} as an entrypoint for interacting with MongoDB.
*/
@Deprecated
@Deprecated(since = "6.2.0", forRemoval = true)
public class JacksonDBCollection<T, K> {

private final JacksonMongoCollection<T> delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.graylog2.contentpacks.model.entities.PipelineEntity;
import org.graylog2.contentpacks.model.entities.references.ValueReference;
import org.graylog2.database.MongoCollections;
import org.graylog2.database.MongoConnection;
import org.graylog2.database.NotFoundException;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.plugin.streams.Stream;
Expand Down Expand Up @@ -99,12 +98,12 @@ public class PipelineFacadeTest {
@Before
@SuppressForbidden("Using Executors.newSingleThreadExecutor() is okay in tests")
public void setUp() throws Exception {
final MongoConnection mongoConnection = mongodb.mongoConnection();
final MongoJackObjectMapperProvider mapperProvider = new MongoJackObjectMapperProvider(objectMapper);
final ClusterEventBus clusterEventBus = new ClusterEventBus("cluster-event-bus", Executors.newSingleThreadExecutor());

pipelineService = new MongoDbPipelineService(new MongoCollections(mapperProvider, mongoConnection), clusterEventBus);
connectionsService = new MongoDbPipelineStreamConnectionsService(mongoConnection, mapperProvider, clusterEventBus);
final MongoCollections mongoCollections = new MongoCollections(new MongoJackObjectMapperProvider(objectMapper),
mongodb.mongoConnection());
pipelineService = new MongoDbPipelineService(mongoCollections, clusterEventBus);
connectionsService = new MongoDbPipelineStreamConnectionsService(mongoCollections, clusterEventBus);

facade = new PipelineFacade(objectMapper, pipelineService, connectionsService, pipelineRuleParser, ruleService, streamService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ const WidgetFocusProvider = ({ children }: { children: React.ReactNode }): React
query,
);

history.replace(newURI);
if (newURI !== query) {
history.replace(newURI);
}
}, [history, query]);

const setWidgetFocusing = useCallback((widgetId: string) => updateFocusQueryParams({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
import { concatQueryStrings, escape } from './QueryHelper';
import { MISSING_BUCKET_NAME } from 'views/Constants';

import { concatQueryStrings, escape, predicate } from './QueryHelper';

describe('QueryHelper', () => {
it('quotes $ in values', () => {
Expand Down Expand Up @@ -52,4 +54,19 @@ describe('QueryHelper', () => {
expect(result).toEqual('field1:value1 OR field2:value2');
});
});

describe('predicate', () => {
it('creates simple field/value predicate for strings', () => {
expect(predicate('foo', 'bar')).toEqual('foo:bar');
});

it('creates simple field/value predicate for numbers', () => {
expect(predicate('foo', 23)).toEqual('foo:23');
});

it('treats missing value bucket correctly', () => {
expect(predicate('foo', MISSING_BUCKET_NAME)).toEqual('NOT _exists_:foo');
expect(predicate('foo', escape(MISSING_BUCKET_NAME))).toEqual('NOT _exists_:foo');
});
});
});
Loading

0 comments on commit b8bdbf5

Please sign in to comment.