Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker changes #1

Open
wants to merge 102 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
07d8059
Complete cicd
Feb 14, 2024
008e15d
Update ci.yml
mahdigheidi Feb 14, 2024
b5ed58b
debug
Feb 14, 2024
38bcd57
Fix deploy stage pipeline
Feb 14, 2024
fb5e853
Fix static code issues
Feb 14, 2024
6e80c2c
fix broker dockerfile
Feb 14, 2024
0a4fa8e
Fix broker run
Feb 14, 2024
7bff65c
Fix import dependencies
Feb 14, 2024
8d8353d
remove cast to int.
hoseinaghaei Feb 14, 2024
88d0517
fix data path.
hoseinaghaei Feb 14, 2024
694f0b9
fix path.
hoseinaghaei Feb 14, 2024
cf556fd
add partition_count to log.
hoseinaghaei Feb 14, 2024
824edc6
remove log.
hoseinaghaei Feb 14, 2024
b035b1d
fix partition count.
hoseinaghaei Feb 14, 2024
b319786
remove logs.
hoseinaghaei Feb 14, 2024
a6d39b2
add logs.
hoseinaghaei Feb 15, 2024
512fbd4
add logs.
hoseinaghaei Feb 15, 2024
0e3df39
add logs.
hoseinaghaei Feb 15, 2024
2bc2687
use singleton on subscribers.
hoseinaghaei Feb 15, 2024
270f8c2
add log on error.
hoseinaghaei Feb 15, 2024
0c14213
add log on error.
hoseinaghaei Feb 15, 2024
b63f9f0
add log on error.
hoseinaghaei Feb 15, 2024
021d2b4
get subscriptions on each req.
hoseinaghaei Feb 15, 2024
f952ffc
change write style
hoseinaghaei Feb 15, 2024
c296665
change to string.
hoseinaghaei Feb 15, 2024
84c712f
sync replica up.
hoseinaghaei Feb 15, 2024
187571f
sync replica up.
hoseinaghaei Feb 15, 2024
19e2ca0
log.
hoseinaghaei Feb 15, 2024
6f98d04
log.
hoseinaghaei Feb 15, 2024
836eb4a
fix none replica.
hoseinaghaei Feb 15, 2024
ff8a59b
fix none replica.
hoseinaghaei Feb 15, 2024
d1ca202
fix none replica.
hoseinaghaei Feb 15, 2024
74646af
fix none replica.
hoseinaghaei Feb 15, 2024
3a76232
some logs.
hoseinaghaei Feb 15, 2024
a21e49a
fix error.
hoseinaghaei Feb 15, 2024
f230513
logs.
hoseinaghaei Feb 15, 2024
0cb9c7a
write without replica.
hoseinaghaei Feb 15, 2024
5b80a5c
fix.
hoseinaghaei Feb 15, 2024
7ef5997
Complete cicd
Feb 14, 2024
1df50b7
Update ci.yml
mahdigheidi Feb 14, 2024
59381d8
debug
Feb 14, 2024
e489a37
Fix deploy stage pipeline
Feb 14, 2024
b63ab32
Fix static code issues
Feb 14, 2024
b93cdda
fix broker dockerfile
Feb 14, 2024
dd3acfd
Fix broker run
Feb 14, 2024
a4ad460
Fix import dependencies
Feb 14, 2024
30ea29e
fix data path.
hoseinaghaei Feb 14, 2024
5d366a1
fix path.
hoseinaghaei Feb 14, 2024
4e10250
add partition_count to log.
hoseinaghaei Feb 14, 2024
7d454e5
remove log.
hoseinaghaei Feb 14, 2024
6bb2467
fix partition count.
hoseinaghaei Feb 14, 2024
022a1bf
remove logs.
hoseinaghaei Feb 14, 2024
12c54cc
add logs.
hoseinaghaei Feb 15, 2024
3fbc14e
add logs.
hoseinaghaei Feb 15, 2024
0035549
add logs.
hoseinaghaei Feb 15, 2024
403705a
use singleton on subscribers.
hoseinaghaei Feb 15, 2024
7097c84
add log on error.
hoseinaghaei Feb 15, 2024
b255056
add log on error.
hoseinaghaei Feb 15, 2024
cdb401f
add log on error.
hoseinaghaei Feb 15, 2024
f9ad75d
get subscriptions on each req.
hoseinaghaei Feb 15, 2024
173512f
change write style
hoseinaghaei Feb 15, 2024
2dd9e8e
change to string.
hoseinaghaei Feb 15, 2024
662bd9d
sync replica up.
hoseinaghaei Feb 15, 2024
a578bbc
sync replica up.
hoseinaghaei Feb 15, 2024
334b01f
log.
hoseinaghaei Feb 15, 2024
554c664
log.
hoseinaghaei Feb 15, 2024
306427b
fix none replica.
hoseinaghaei Feb 15, 2024
f60622f
fix none replica.
hoseinaghaei Feb 15, 2024
9d31c7d
fix none replica.
hoseinaghaei Feb 15, 2024
7009c1a
fix none replica.
hoseinaghaei Feb 15, 2024
8c9a093
some logs.
hoseinaghaei Feb 15, 2024
8dce07b
fix error.
hoseinaghaei Feb 15, 2024
0de02ca
logs.
hoseinaghaei Feb 15, 2024
c4fc631
write without replica.
hoseinaghaei Feb 15, 2024
e76ebed
fix.
hoseinaghaei Feb 15, 2024
59aedac
add timeout to request and fix static analysis
Feb 15, 2024
1b2bf70
fix cicd
Feb 15, 2024
f737b86
update checkout action
Feb 15, 2024
8cf5ec5
fix lint
Feb 15, 2024
7d3bd6a
fix mod func.
hoseinaghaei Feb 15, 2024
9c71625
Merge remote-tracking branch 'origin/master'
hoseinaghaei Feb 15, 2024
d405801
fix.
hoseinaghaei Feb 15, 2024
46f9201
fix.
hoseinaghaei Feb 15, 2024
1a42ecc
fix monitoring
Feb 15, 2024
9e1b03c
fix.
hoseinaghaei Feb 15, 2024
48f7d8f
Merge remote-tracking branch 'origin/master'
hoseinaghaei Feb 15, 2024
6168fdb
print log on docker.
hoseinaghaei Feb 15, 2024
0e3fd29
fix.
hoseinaghaei Feb 15, 2024
c8977a1
fix.
hoseinaghaei Feb 15, 2024
209d07d
fix monitoring
Feb 15, 2024
769efdc
fix.
hoseinaghaei Feb 15, 2024
da19c4b
Merge remote-tracking branch 'origin/master'
hoseinaghaei Feb 15, 2024
3f2f87f
fix.
hoseinaghaei Feb 15, 2024
407f70e
fix.
hoseinaghaei Feb 15, 2024
f101371
fix monitoring
Feb 15, 2024
3394e2f
fix.
hoseinaghaei Feb 15, 2024
e21bfc4
Merge remote-tracking branch 'origin/master'
hoseinaghaei Feb 15, 2024
874de31
fix.
hoseinaghaei Feb 15, 2024
355412a
fix.
hoseinaghaei Feb 15, 2024
daa6533
fix.
hoseinaghaei Feb 15, 2024
59794b2
fix.
hoseinaghaei Feb 15, 2024
85cdfab
fix.
hoseinaghaei Feb 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ jobs:
uses: docker/build-push-action@v5
with:
context: ./kafka_server/coordinator/
platforms: linux/amd64,linux/arm64
push: true
tags: mahdigheidi/sad-coordinator:latest

Expand All @@ -66,18 +67,18 @@ jobs:
matrix:
python-version: ["3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8
pip install flake8
- name: Analysing the code with flake8
run: |
flake8 --ignore E501 kafka_server/
flake8 --ignore=E401,E402,E501 kafka_server/
# - name: Test with pytest
# run: |
# pytest
Expand All @@ -88,10 +89,10 @@ jobs:

steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: 3.x

Expand All @@ -101,7 +102,7 @@ jobs:

- name: Run bandit
run: |
bandit -r .
bandit --skip B104 -r .


deploy-prod-broker:
Expand All @@ -111,7 +112,7 @@ jobs:

steps:
- name: checkout repo
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: set up ssh keys
uses: shimataro/ssh-key-action@v2
Expand All @@ -120,18 +121,41 @@ jobs:
known_hosts: ${{ secrets.SSH_HOST }}
- name: connect and pull
run: |
ssh -i ~/.ssh/id_rsa [email protected] "docker compose up -d broker-1 broker-2 broker-3 && exit"
- name: cleanup
run: rm -rf ~/.ssh
ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d broker-1 broker-2 broker-3 && exit"

deploy-prod-coordinator:
name: Deploy Coordinator to Production
runs-on: ubuntu-latest
runs-on: self-hosted
needs: build-push-coordinator

steps:
- name: Deploy to production
run: |
# Here you can add your deployment commands/scripts
# For example:
ssh user@your-production-server 'docker pull ghcr.io/your-github-username/your-repo-name:latest && docker-compose up -d'
- name: checkout repo
uses: actions/checkout@v4

- name: set up ssh keys
uses: shimataro/ssh-key-action@v2
with:
key: ${{ secrets.SSH_PRIVATE_KEY }}
known_hosts: ${{ secrets.SSH_HOST }}
- name: connect and pull
run: |
ls
ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d coordinator-1 coordinator-2 && exit"

deploy-monitoring:
name: Deploy Monitoring to Production
runs-on: self-hosted

steps:
- name: checkout repo
uses: actions/checkout@v4

- name: set up ssh keys
uses: shimataro/ssh-key-action@v2
with:
key: ${{ secrets.SSH_PRIVATE_KEY }}
known_hosts: ${{ secrets.SSH_HOST }}
- name: connect and pull
run: |
ls
ssh -i ~/.ssh/id_rsa -o StrictHostKeyChecking=no ${{ secrets.SSH_USER }}@${{ secrets.SSH_HOST }} "docker-compose up -d node_exporter prometheus grafana && exit"
41 changes: 25 additions & 16 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,45 @@ services:

prometheus:
image: prom/prometheus:latest
command:
- --storage.tsdb.retention.time=7d
- --config.file=/etc/prometheus/prometheus.yml
# - --storage.tsdb.retention.size=10GB
container_name: prometheus
restart: unless-stopped
volumes:
- ./monitoring/prometheus:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- broker

grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- ./monitoring/grafana-provisioning/dashboards:/etc/grafana/provisioning/dashboards
- ./monitoring/grafana-provisioning/datasources:/etc/grafana/provisioning/datasources

node_exporter_1:
image: prom/node-exporter:latest
ports:
- "9100:9100"
- grafana-data:/var/lib/grafana
networks:
- broker

node_exporter_2:
image: prom/node-exporter:latest
node_exporter:
image: quay.io/prometheus/node-exporter:latest
container_name: node_exporter
command:
- '--path.rootfs=/host'
pid: host
ports:
- "9101:9100"
- "9100:9100"
restart: unless-stopped
volumes:
- '/:/host:ro,rslave'
networks:
- broker

volumes:
prometheus-data:
driver: local
grafana-data:
driver: local

networks:
broker:
driver: bridge
3 changes: 2 additions & 1 deletion kafka_server/broker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ COPY . .
RUN pip install -r requirements.txt
EXPOSE 5003
ENV FLASK_APP=app
CMD ["python", "-u", "controller/produce.py"]
CMD ["python", "-u", "controller/produce.py"]
# CMD ["python", "controller/produce.py"]
11 changes: 3 additions & 8 deletions kafka_server/broker/controller/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _post(data, url: str):
print(f"master coordinator {_master_coordinator_url()} is not alive")

try:
response = requests.post(coordinator, json=data, data=json.dumps(data))
response = requests.post(coordinator, json=data, data=json.dumps(data), timeout=2)
return response.status_code == 200
except requests.RequestException as e:
print(f"Error on post {e}")
Expand All @@ -55,7 +55,7 @@ def _get(data, url: str):
coordinator = _url(master_not_replica=master_alive, url=url)
if not master_alive:
print(f"master coordinator {_master_coordinator_url()} is not alive")
response = requests.get(coordinator, json=data, data=json.dumps(data))
response = requests.get(coordinator, json=data, data=json.dumps(data), timeout=2)
if response.status_code == 200:
return json.loads(response.content.decode("utf-8"))
except requests.RequestException as e:
Expand All @@ -78,12 +78,7 @@ def heartbeat():
'ip': os.getenv("IP"),
'port': os.getenv("PORT"),
}

if _post(payload, heartbeat_url):
print(f"Heartbeat {payload}")
else:
print("Heartbeat Failed")

_post(payload, heartbeat_url)
time.sleep(3) # 3 Seconds wait for another heartbeat


Expand Down
20 changes: 12 additions & 8 deletions kafka_server/broker/controller/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@
import os
import sys
import threading
from main import init


BROKER_PROJECT_PATH = os.getenv("BROKER_PROJECT_PATH", "/app/")
sys.path.append(os.path.abspath(BROKER_PROJECT_PATH))

from file.indexer import Indexer
from file.read import Read
from file.write import Write
from main import init
from flask import Flask, request, jsonify
from manager.env import get_primary_partition, get_replica_url
from metrics import coordinator_write_requests, coordinator_replicate_index_requests
from prometheus_client import make_wsgi_app
from werkzeug.middleware.dispatcher import DispatcherMiddleware

BROKER_PROJECT_PATH = os.getenv("BROKER_PROJECT_PATH", "/app/")
sys.path.append(os.path.abspath(BROKER_PROJECT_PATH))


app = Flask(__name__)
app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {
Expand All @@ -34,7 +36,7 @@ def write():
# Assuming the request body is in JSON format with 'key' and 'value' fields
data = request.get_json()
key = data.get('key')
value = data.get('value').encode('utf-8')
value = data.get('value')
print(key, value)

write_instance = Write(get_primary_partition(), get_replica_url())
Expand Down Expand Up @@ -98,7 +100,7 @@ def subscription():
data = json.loads(request.data.decode("utf-8"))
brokers = data['brokers']

brokers_file_path = os.path.join(os.getcwd(), '../data', 'subscriptions', 'brokers.json')
brokers_file_path = os.path.join(os.getcwd(), 'data', 'subscriptions', 'brokers.json')

with open(brokers_file_path, "w") as file:
json.dump(brokers, file)
Expand All @@ -120,7 +122,7 @@ def subscribers():
data = json.loads(request.data.decode("utf-8"))
brokers = data['subscribers']

subscribers_file_path = os.path.join(os.getcwd(), '../data', 'subscriptions', 'subscribers.json')
subscribers_file_path = os.path.join(os.getcwd(), 'data', 'subscriptions', 'subscribers.json')

with open(subscribers_file_path, "w+") as file:
json.dump(brokers, file)
Expand All @@ -137,6 +139,7 @@ def broker_down():
data = json.loads(request.data.decode("utf-8"))
partition = data['partition']
os.environ['REPLICA_MIRROR_DOWN'] = str(partition)
print(os.environ['REPLICA_MIRROR_DOWN'], "replica mirror down")
return jsonify({'status': 'Data written successfully.'}), 200
except Exception as e:
print(e)
Expand Down Expand Up @@ -182,4 +185,5 @@ def ack():
crun.daemon = True
crun.start()

app.run('0.0.0.0', port=5003, debug=True)
broker_listening_addr = '0.0.0.0'
app.run(broker_listening_addr, port=5003)
2 changes: 1 addition & 1 deletion kafka_server/broker/file/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def hash_md5(key: str):
return hashlib.md5(key.encode()).hexdigest()
return hashlib.md5(key.encode(), usedforsecurity=False).hexdigest()
21 changes: 11 additions & 10 deletions kafka_server/broker/file/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@ class Indexer:

def __new__(cls, partition: str, replica: str = None):
with cls._lock:
if partition not in cls._instances:
cls._instances[partition] = super().__new__(cls)
cls._instances[partition].partition = partition
cls._instances[partition].replica = replica
cls._instances[partition]._write = 0
cls._instances[partition]._read = 0
cls._instances[partition]._sync = 0
cls._instances[partition].load()
path = cls._instances[partition].__dir_path()
if f"{partition}-{replica}" not in cls._instances:
cls._instances[f"{partition}-{replica}"] = super().__new__(cls)
cls._instances[f"{partition}-{replica}"].partition = partition
cls._instances[f"{partition}-{replica}"].replica = replica
cls._instances[f"{partition}-{replica}"].load()
path = cls._instances[f"{partition}-{replica}"].__dir_path()
os.makedirs(path, exist_ok=True)
return cls._instances[partition]

return cls._instances[f"{partition}-{replica}"]

def load(self):
self._write = self._load_variable('write')
Expand Down Expand Up @@ -106,9 +104,12 @@ def __dir_path(self) -> str:

def send_to_replica(self):
if self.replica is None:
print("No replica found /n/n/n")
return

url = f'{self.replica}/replica/index'
data = {'partition': self.partition, 'read': self._read, 'sync': self._sync}
print(data, "to Replica")
response = requests.post(url, json=data, timeout=2)
if response.status_code != 200:
raise Exception(f'indexed not yet updated {response}')
Loading