Skip to content

Commit

Permalink
More fixes to the deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
leboiko committed Dec 16, 2024
1 parent 601439a commit c60be80
Show file tree
Hide file tree
Showing 33 changed files with 392 additions and 355 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ volume
.env.docker
.env.dump
logs/*
kube_files/test_postgres.yaml
21 changes: 14 additions & 7 deletions consumer/src/consumer_type/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,23 @@ impl BasicConsumer for Sqs {

/// This function receives a [`String`] message and try to send it. Note
/// that the message is serialized into a JSON string before being sent.
async fn send_message(&self, message: String) -> Result<(), ConsumerError> {
self.get_client()
async fn send_message(
&self,
message: String,
group_id: Option<String>,
) -> Result<(), ConsumerError> {
let mut message = self
.get_client()
.await
.send_message()
.queue_url(&*self.get_output_queue())
.message_body(&message)
// If the queue is FIFO, you need to set .message_deduplication_id
// and message_group_id or configure the queue for ContentBasedDeduplication.
.send()
.await?;
.message_body(&message);
// If we are using a FIFO queue, we need to set the message group id
if let Some(group_id) = group_id {
message = message.message_group_id(group_id);
}

message.send().await?;

Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion consumer/src/mode/decoded/atom/atom_supported_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,10 @@ pub async fn get_supported_atom_metadata(
ResolverConsumerMessage::new_atom(atom.clone(), decoded_atom_data.to_string());
decoded_consumer_context
.client
.send_message(serde_json::to_string(&message)?)
.send_message(
serde_json::to_string(&message)?,
Some("decoded".to_string()),
)
.await?;

// Now we try to parse the JSON and return the metadata. At this point
Expand Down
5 changes: 4 additions & 1 deletion consumer/src/mode/decoded/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ pub async fn get_or_create_account(
let message = ResolverConsumerMessage::new_account(account.clone());
decoded_consumer_context
.client
.send_message(serde_json::to_string(&message)?)
.send_message(
serde_json::to_string(&message)?,
Some("decoded".to_string()),
)
.await?;
Ok(account)
}
Expand Down
2 changes: 1 addition & 1 deletion consumer/src/mode/raw/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ConsumerMode {
let message = DecodedMessage::new(event, raw_message.body);
raw_consumer_context
.client
.send_message(serde_json::to_string(&message)?)
.send_message(serde_json::to_string(&message)?, Some("raw".to_string()))
.await?;
info!("Sent a decoded message to the queue!");
}
Expand Down
7 changes: 4 additions & 3 deletions consumer/src/mode/resolver/ens_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ impl Ens {
info!("Sending image to IPFS upload consumer: {}", url);
consumer_context
.client
.send_message(serde_json::to_string(&IpfsUploadMessage {
image: url.clone(),
})?)
.send_message(
serde_json::to_string(&IpfsUploadMessage { image: url.clone() })?,
None,
)
.await?;
Ok(Some(url))
} else {
Expand Down
2 changes: 1 addition & 1 deletion consumer/src/mode/resolver/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl ResolverMessageType {
info!("Sending image to IPFS upload consumer: {}", image);
resolver_consumer_context
.client
.send_message(serde_json::to_string(&IpfsUploadMessage { image })?)
.send_message(serde_json::to_string(&IpfsUploadMessage { image })?, None)
.await?;
}

Expand Down
6 changes: 5 additions & 1 deletion consumer/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ pub trait BasicConsumer: Send + Sync {
/// different modes, different data sources and different consumer types.
async fn process_messages(&self, mode: ConsumerMode) -> Result<(), ConsumerError>;
async fn receive_message(&self) -> Result<ReceiveMessageOutput, ConsumerError>;
async fn send_message(&self, message: String) -> Result<(), ConsumerError>;
async fn send_message(
&self,
message: String,
group_id: Option<String>,
) -> Result<(), ConsumerError>;
}

/// This trait needs to be implemented by every new data source that we want to
Expand Down
5 changes: 2 additions & 3 deletions kube_files/api/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ spec:
containers:
- name: api
image: ghcr.io/0xintuition/image-guard:latest
command: ["sh", "-c", "echo $DATABASE_URL && sleep 3600"]
imagePullPolicy: Always
ports:
- containerPort: 3000
Expand All @@ -26,12 +25,12 @@ spec:
readOnly: true
envFrom:
- secretRef:
name: aws-secrets
name: api-aws-secrets
volumes:
- name: secrets-store-inline
csi:
driver: secrets-store.csi.k8s.io
readOnly: true
volumeAttributes:
secretProviderClass: "aws-secrets"
secretProviderClass: "api-aws-secrets"

4 changes: 2 additions & 2 deletions kube_files/api/secret-provider.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: secrets-store.csi.x-k8s.io/v1
kind: SecretProviderClass
metadata:
name: aws-secrets
name: api-aws-secrets
namespace: default
spec:
provider: aws
Expand All @@ -27,7 +27,7 @@ spec:
- path: "FLAG_LOCAL_WITH_CLASSIFICATION"
objectAlias: "FLAG_LOCAL_WITH_CLASSIFICATION"
secretObjects:
- secretName: "aws-secrets" # The name of the Kubernetes secret to create
- secretName: "api-aws-secrets" # The name of the Kubernetes secret to create
type: Opaque
data:
- objectName: "CLASSIFICATION_API_PORT"
Expand Down
87 changes: 45 additions & 42 deletions kube_files/consumers/decoded/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,56 +1,59 @@
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
kompose.cmd: kompose convert -f docker-compose-kompose.yml
kompose.version: 1.34.0 (HEAD)
labels:
io.kompose.service: decoded-consumer
name: decoded-consumer
spec:
replicas: 1
selector:
matchLabels:
app: decoded-consumer
io.kompose.service: decoded-consumer
strategy:
type: Recreate
template:
metadata:
annotations:
kompose.cmd: kompose convert -f docker-compose-kompose.yml
kompose.version: 1.34.0 (HEAD)
labels:
app: decoded-consumer
io.kompose.service: decoded-consumer
spec:
serviceAccountName: secrets-access-sa
containers:
- name: decoded-consumer
image: ghcr.io/0xintuition/consumer:latest
args: ["./consumer", "--mode", "decoded"]
ports:
- containerPort: 3002
env:
- name: AWS_REGION
value: "us-west-2"
- name: AWS_STS_REGIONAL_ENDPOINTS
value: "regional"
- name: AWS_ROLE_ARN
value: "arn:aws:iam::064662847354:role/aws-secrets"
- name: AWS_WEB_IDENTITY_TOKEN_FILE
value: "/var/run/secrets/eks.amazonaws.com/serviceaccount/token"
volumeMounts:
- name: secrets-store-inline
mountPath: "/mnt/secrets"
readOnly: true
- name: decoded-logs
mountPath: "/var/log/app"
- name: aws-iam-token
mountPath: "/var/run/secrets/eks.amazonaws.com/serviceaccount"
readOnly: true
- args:
- ./consumer
- --mode
- decoded
envFrom:
- secretRef:
name: decoded-aws-secrets
image: ghcr.io/0xintuition/consumer:latest
imagePullPolicy: Always
name: decoded-consumer
ports:
- containerPort: 3002
protocol: TCP
volumeMounts:
- name: secrets-store
mountPath: "/mnt/secrets"
readOnly: true
- name: decoded-logs
mountPath: /logs
readOnly: false
securityContext:
runAsUser: 0
runAsGroup: 0
volumes:
- name: secrets-store-inline
csi:
driver: secrets-store.csi.k8s.io
readOnly: true
volumeAttributes:
secretProviderClass: "aws-secrets"
- name: decoded-logs
persistentVolumeClaim:
claimName: decoded-logs
- name: aws-iam-token
projected:
sources:
- serviceAccountToken:
path: token
expirationSeconds: 86400


- name: secrets-store
csi:
driver: secrets-store.csi.k8s.io
readOnly: true
volumeAttributes:
secretProviderClass: decoded-aws-secrets
- name: decoded-logs
persistentVolumeClaim:
claimName: decoded-logs
49 changes: 0 additions & 49 deletions kube_files/consumers/decoded/errors.txt

This file was deleted.

17 changes: 6 additions & 11 deletions kube_files/consumers/decoded/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

namespace: default

resources:
- storage-class.yaml
- pvc.yaml
- deployment.yaml
- service.yaml
- service-account.yaml
- deployment.yaml
- pvc.yaml
- service.yaml
- secret-provider.yaml

labels:
- includeSelectors: true
pairs:
component: decoded-consumer
commonLabels:
component: decoded-consumer
5 changes: 2 additions & 3 deletions kube_files/consumers/decoded/pvc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: decoded-logs
namespace: default
spec:
storageClassName: gp2
accessModes:
- ReadWriteOnce
storageClassName: gp2-immediate
resources:
requests:
storage: 1Gi
storage: 2Gi
57 changes: 57 additions & 0 deletions kube_files/consumers/decoded/secret-provider.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
apiVersion: secrets-store.csi.x-k8s.io/v1
kind: SecretProviderClass
metadata:
name: decoded-aws-secrets
namespace: default
spec:
provider: aws
parameters:
objects: |
- objectName: "new-be-secrets"
objectType: "secretsmanager"
objectVersionLabel: "AWSCURRENT"
jmesPath:
- path: "AWS_ACCESS_KEY_ID"
objectAlias: "AWS_ACCESS_KEY_ID"
- path: "AWS_REGION"
objectAlias: "AWS_REGION"
- path: "AWS_SECRET_ACCESS_KEY"
objectAlias: "AWS_SECRET_ACCESS_KEY"
- path: "CONSUMER_METRICS_API_PORT"
objectAlias: "CONSUMER_METRICS_API_PORT"
- path: "CONSUMER_TYPE"
objectAlias: "CONSUMER_TYPE"
- path: "DATABASE_URL"
objectAlias: "DATABASE_URL"
- path: "DECODED_LOGS_QUEUE_URL"
objectAlias: "DECODED_LOGS_QUEUE_URL"
- path: "INTUITION_CONTRACT_ADDRESS"
objectAlias: "INTUITION_CONTRACT_ADDRESS"
- path: "RESOLVER_QUEUE_URL"
objectAlias: "RESOLVER_QUEUE_URL"
- path: "RPC_URL_BASE_MAINNET"
objectAlias: "RPC_URL_BASE_MAINNET"
secretObjects:
- secretName: "decoded-aws-secrets" # The name of the Kubernetes secret to create
type: Opaque
data:
- objectName: "AWS_ACCESS_KEY_ID"
key: "AWS_ACCESS_KEY_ID"
- objectName: "AWS_REGION"
key: "AWS_REGION"
- objectName: "AWS_SECRET_ACCESS_KEY"
key: "AWS_SECRET_ACCESS_KEY"
- objectName: "CONSUMER_METRICS_API_PORT"
key: "CONSUMER_METRICS_API_PORT"
- objectName: "CONSUMER_TYPE"
key: "CONSUMER_TYPE"
- objectName: "DATABASE_URL"
key: "DATABASE_URL"
- objectName: "DECODED_LOGS_QUEUE_URL"
key: "DECODED_LOGS_QUEUE_URL"
- objectName: "INTUITION_CONTRACT_ADDRESS"
key: "INTUITION_CONTRACT_ADDRESS"
- objectName: "RESOLVER_QUEUE_URL"
key: "RESOLVER_QUEUE_URL"
- objectName: "RPC_URL_BASE_MAINNET"
key: "RPC_URL_BASE_MAINNET"
Loading

0 comments on commit c60be80

Please sign in to comment.