diff --git a/.gitignore b/.gitignore index f6a874f..dabb1d8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,11 @@ !.gitignore TODO __pycache__ +*.DS_Store* *.egg-info/ /build/ /dist/ +/.idea/ +venv/ +env/ +transifex_input.yaml diff --git a/tutorevent_bus_redis/patches/local-docker-compose-services b/tutorevent_bus_redis/patches/local-docker-compose-services new file mode 100644 index 0000000..88b1f41 --- /dev/null +++ b/tutorevent_bus_redis/patches/local-docker-compose-services @@ -0,0 +1,114 @@ +{% if RUN_KAFKA_SERVER %} +# needed by Kafka to keep track of nodes, topics, and messages. +zookeeper: + image: confluentinc/cp-zookeeper:6.2.1 + restart: unless-stopped + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + +# Events broker +kafka: + image: confluentinc/cp-server:6.2.1 + depends_on: + - zookeeper + restart: unless-stopped + ports: + - "9092:9092" + - "9101:9101" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:18081 + CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092 + CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 + CONFLUENT_METRICS_ENABLE: 'true' + CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' + +# storage layer for data schemas in Kafka +schema-registry: + image: confluentinc/cp-schema-registry:6.2.1 + depends_on: + - kafka + restart: unless-stopped + ports: + - "18081:18081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:18081 +{% endif %} + +{% if RUN_KAFKA_SERVER and RUN_KAFKA_UI %} +# browser app for monitoring local Kafka cluster. This is quite memory- and CPU-intensive, so it should only be used for local Kafka debugging +kafka-control-center: + image: confluentinc/cp-enterprise-control-center:6.2.1 + depends_on: + - kafka + - schema-registry + restart: unless-stopped + ports: + - "9021:9021" + environment: + CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka:29092 + CONTROL_CENTER_SCHEMA_REGISTRY_URL: http://schema-registry:18081 + CONTROL_CENTER_REPLICATION_FACTOR: 1 + CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 + CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 + CONFLUENT_METRICS_TOPIC_REPLICATION: 1 + PORT: 9021 +{% endif %} + +{% if EVENT_BUS_BACKEND %} +# This is just a stub test for showing how we could run consumers +login-consumer: + image: docker.io/overhangio/openedx:17.0.2-nightly + environment: + SERVICE_VARIANT: lms + DJANGO_SETTINGS_MODULE: lms.envs.tutor.production + command: > + ./manage.py lms consume_events -t user-login -g user-activity-service {% if EVENT_BUS_BACKEND == "redis" %}--extra '{"consumer_name": "user-login-1"}'{% endif %} + restart: unless-stopped + volumes: + - ../apps/openedx/settings/lms:/openedx/edx-platform/lms/envs/tutor:ro + - ../apps/openedx/settings/cms:/openedx/edx-platform/cms/envs/tutor:ro + - ../apps/openedx/config:/openedx/config:ro + - ../../data/lms:/openedx/data + - ../../data/openedx-media:/openedx/media + depends_on: + - {% if EVENT_BUS_BACKEND == "redis" %}redis{% elif EVENT_BUS_BACKEND == "kafka" %}kafka{% endif %} + +# This is just a stub test for showing how we could run consumers +tracking-consumer: + image: docker.io/overhangio/openedx:17.0.2-nightly + environment: + SERVICE_VARIANT: lms + DJANGO_SETTINGS_MODULE: lms.envs.tutor.production + command: > + ./manage.py lms consume_events -t analytics -g analytics-service {% if EVENT_BUS_BACKEND == "redis" %}--extra '{"consumer_name": "analytics-1"}'{% endif %} + restart: unless-stopped + volumes: + - ../apps/openedx/settings/lms:/openedx/edx-platform/lms/envs/tutor:ro + - ../apps/openedx/settings/cms:/openedx/edx-platform/cms/envs/tutor:ro + - ../apps/openedx/config:/openedx/config:ro + - ../../data/lms:/openedx/data + - ../../data/openedx-media:/openedx/media + {%- for mount in iter_mounts(MOUNTS, "openedx", "lms") %} + - {{ mount }} + {%- endfor %} + depends_on: + - {% if EVENT_BUS_BACKEND == "redis" %}redis{% elif EVENT_BUS_BACKEND == "kafka" %}kafka{% endif %} +{% endif %} diff --git a/tutorevent_bus_redis/patches/openedx-common-settings b/tutorevent_bus_redis/patches/openedx-common-settings new file mode 100644 index 0000000..087844c --- /dev/null +++ b/tutorevent_bus_redis/patches/openedx-common-settings @@ -0,0 +1,42 @@ +# Backend independent settings for event production +EVENT_BUS_PRODUCER_CONFIG = {{ EVENT_BUS_PRODUCER_CONFIG }} +SEND_CATALOG_INFO_SIGNAL = {{ EVENT_BUS_SEND_CATALOG_INFO_SIGNAL }} + + +{% if EVENT_BUS_BACKEND == 'redis' %} +# redis connection url +# https://redis.readthedocs.io/en/stable/examples/ssl_connection_examples.html#Connecting-to-a-Redis-instance-via-a-URL-string +EVENT_BUS_REDIS_CONNECTION_URL = "{{ EVENT_BUS_REDIS_CONNECTION_URL }}" +EVENT_BUS_TOPIC_PREFIX = "{{ EVENT_BUS_REDIS_TOPIC_PREFIX }}" +EVENT_BUS_PRODUCER = "{{ EVENT_BUS_REDIS_PRODUCER }}" +EVENT_BUS_CONSUMER = "{{ EVENT_BUS_REDIS_CONSUMER }}" +{% if EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT %} +EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT = int("{{ EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT }}") +{% endif %} +EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT = int("{{ EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT }}") +EVENT_BUS_REDIS_STREAM_MAX_LEN = int("{{ EVENT_BUS_REDIS_STREAM_MAX_LEN }}") +{% endif %} + +{% if EVENT_BUS_BACKEND == 'kafka' %} +EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL = "{{ EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL }}" +EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS = "{{ EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS }}" +EVENT_BUS_PRODUCER = "{{ EVENT_BUS_KAFKA_PRODUCER }}" +EVENT_BUS_CONSUMER = "{{ EVENT_BUS_KAFKA_CONSUMER }}" +EVENT_BUS_TOPIC_PREFIX = "{{ EVENT_BUS_KAFKA_TOPIC_PREFIX }}" +{% endif %} + +{% if EVENT_BUS_TRACKING_LOGS %} +SEND_TRACKING_EVENT_EMITTED_SIGNAL = True + +# Update the backends to use the event bus +EVENT_TRACKING_BACKENDS["xapi"]["ENGINE"] = "eventtracking.backends.event_bus.EventBusRoutingBackend" +EVENT_TRACKING_BACKENDS["caliper"]["ENGINE"] = "eventtracking.backends.event_bus.EventBusRoutingBackend" + +# Update backend to send events in sync mode +EVENT_TRACKING_BACKENDS["xapi"]["OPTIONS"]["backends"]["xapi"]["ENGINE"] = "event_routing_backends.backends.sync_events_router.SyncEventsRouter" +EVENT_TRACKING_BACKENDS["caliper"]["OPTIONS"]["backends"]["caliper"]["ENGINE"] = "event_routing_backends.backends.sync_events_router.SyncEventsRouter" + +# Remove caliper from the tracking backends to prevent double-event-emission +EVENT_TRACKING_BACKENDS.pop("caliper") + +{% endif %} diff --git a/tutorevent_bus_redis/plugin.py b/tutorevent_bus_redis/plugin.py index 18b6e66..d53bba0 100644 --- a/tutorevent_bus_redis/plugin.py +++ b/tutorevent_bus_redis/plugin.py @@ -13,6 +13,26 @@ ######################################## # CONFIGURATION ######################################## +# FIXME: Update this to a saner config structure less likely to break, and able +# to activate and deactivate individual events more easily. +PRODUCER_CONFIG = { + 'org.openedx.content_authoring.xblock.published.v1': { + 'content-authoring-xblock-lifecycle': + {'event_key_field': 'xblock_info.usage_key', 'enabled': False}, + 'content-authoring-xblock-published': + {'event_key_field': 'xblock_info.usage_key', 'enabled': False}, + }, + 'org.openedx.content_authoring.xblock.deleted.v1': { + 'content-authoring-xblock-lifecycle': + {'event_key_field': 'xblock_info.usage_key', 'enabled': False}, + }, + 'org.openedx.learning.auth.session.login.completed.v1': { + 'user-login': {'event_key_field': 'user.pii.username', 'enabled': False}, + }, + 'org.openedx.analytics.tracking.event.emitted.v1': { + 'analytics': {'event_key_field': 'tracking_log.name', 'enabled': True} + }, +} hooks.Filters.CONFIG_DEFAULTS.add_items( [ @@ -20,6 +40,81 @@ # Each new setting is a pair: (setting_name, default_value). # Prefix your setting names with 'EVENT_BUS_REDIS_'. ("EVENT_BUS_REDIS_VERSION", __version__), + + # Possible values are "kafka", "redis", or None to disable the + # event bus + ("EVENT_BUS_BACKEND", "redis"), + + # Settings for producing events + ("EVENT_BUS_SEND_CATALOG_INFO_SIGNAL", True), + ("EVENT_BUS_TRACKING_LOGS", True), + ( + # FIXME: We should only install the one that's configured + "OPENEDX_EXTRA_PIP_REQUIREMENTS", + [ + "edx-event-bus-redis==0.3.3", + "edx-event-bus-kafka==v5.6.0", + "openedx-events==v9.5.1", + "confluent_kafka[avro,schema-registry]", + "git+https://github.com/openedx/platform-plugin-aspects.git@bmtcril/tracking_event_command", + ], + ), + ("EVENT_BUS_PRODUCER_CONFIG", PRODUCER_CONFIG), + + ###################################### + # redis backend settings + + # If true, this will run a separate instance of redis just for the + # event bus to prevent resource conflicts with other services + # TODO: Implement this + # ("RUN_DEDICATED_REDIS_BUS_SERVER", True), + + # Prefix for topics sent over the event bus + ("EVENT_BUS_REDIS_TOPIC_PREFIX", "openedx"), + + # Producer class which can send events to redis streams. + ("EVENT_BUS_REDIS_PRODUCER", "edx_event_bus_redis.create_producer"), + + # Consumer class which can consume events from redis streams. + ("EVENT_BUS_REDIS_CONSUMER", "edx_event_bus_redis.RedisEventConsumer"), + + # If the consumer encounters this many consecutive errors, exit with an error. This is intended to be used in a + # context where a management system (such as Kubernetes) will relaunch the consumer automatically. + # Default is "None", which means the consumer will never relaunch. + ("EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT", 0), + + # How long the consumer should wait for new entries in a stream. + # As we are running the consumer in a while True loop, changing this setting doesn't make much difference expect + # for changing number of monitoring messages while waiting for new events. + # https://redis.io/commands/xread/#blocking-for-data + ("EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT", 60), + + # Limits stream size to approximately this number + ("EVENT_BUS_REDIS_STREAM_MAX_LEN", 10_000), + + ###################################### + # Kafka backend settings + # TODO: Move hard coded settings from local-docker-compose-services here + # Version of https://github.com/openedx/event-bus-kafka to install + # This is what follows 'pip install' so you can use official versions + # or install from forks / branches / PRs here + ("EVENT_BUS_KAFKA_RELEASE", "edx-event-bus-kafka=='v5.6.0'"), + + # This will run schema-manager, zookeeper and kafka. Set to False if you + # are using a 3rd party to host Kafka or managing it outside of Tutor. + ("RUN_KAFKA_SERVER", False), + + # This will run kafka-control-center. This consumes a lot of resources, + # you can turn it off separately from the required services. Requires + # RUN_KAFKA_SERVER to be True as well. + ("RUN_KAFKA_UI", False), + + ("EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL", "http://schema-registry:18081"), + ("EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS", "kafka:29092"), + ("EVENT_BUS_KAFKA_PRODUCER", "edx_event_bus_kafka.create_producer"), + ("EVENT_BUS_KAFKA_CONSUMER", "edx_event_bus_kafka.KafkaEventConsumer"), + ("EVENT_BUS_KAFKA_TOPIC_PREFIX", "dev"), + ("EVENT_BUS_REDIS_CONNECTION_URL", "redis://{% if REDIS_USERNAME and REDIS_PASSWORD %}{{ REDIS_USERNAME }}:{{""REDIS_PASSWORD }}{% endif %}@{{ REDIS_HOST }}:{{ REDIS_PORT }}/5") ] )