Skip to content

Commit

Permalink
Merge pull request #1280 from MetPX/issue1275_attempts0fix
Browse files Browse the repository at this point in the history
for #1275 standby mode can be implemented with attempts=0
  • Loading branch information
petersilva authored Oct 30, 2024
2 parents 8fcd72b + 7ca9115 commit b649f0c
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 9 deletions.
6 changes: 6 additions & 0 deletions docs/source/Explanation/CommandLineGuide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ will be:
* run: all processes are running (and transferring, and not behind, and not slow... normal state.)
* slow: transfering less than minimum bytes/second ( runStateThreshold_slow )
* stop: no processes are running.
* stby: Standby mode: all processes running, but messages are being stored in the local download_retry queue.
* wVip: process doesn't have the vip (only applies when the vip option is specified in the config)

The next columns to the right give more information, detailing how many processes are Running, out of the number expected.
Expand Down Expand Up @@ -2057,6 +2058,11 @@ are errors. The back-off can accumulate to the point where retries could be sepa
or two. Once the server begins responding normally again, the programs will return to normal
processing speed.

If a failure will last for a while, one can stop the flow, configure *attempts 0* to fill the
retry queue without making vain attempts to download or send. At the end of the outage, return
*attempts* to normal, and the retry queue will gradually be drained when there
is room in the current data flow.


EXAMPLES
========
Expand Down
24 changes: 24 additions & 0 deletions docs/source/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,30 @@ of **attempts** (or send, in a sender) will cause the notification message to be
for later retry. When there are no notification messages ready to consume from the AMQP queue,
the retry queue will be queried.

If:

* It is known that transfers will fail for a long time, because of some sort of outage or maintenance
on your destination

* There is a large volume of files you expect to queue up for transfer. so the queues
on the data pump will grow to a point where the pump admins will be uncomfortable.
Note that: All advice about message broker performance and availability tuning
asks users to minimize queueing on the brokers.

* The local state directory ( ~/.cache ) is writable during the outage period.

Then:

One can set *attempts* to 0. This will cause messages queued for transfer to be written
to local download_retry queues (written in the local state directories) and offload
the broker.

When *attempts* is 0, the *sr3 status* command will report that the flow is in the
*standby* state. The retry queue count will rise, and only messages (no data) will be transferred.
When the maintenance activity or failure has been addressed.




baseDir <path> (default: /)
----------------------------
Expand Down
20 changes: 13 additions & 7 deletions docs/source/fr/Explication/GuideLigneDeCommande.rst
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,8 @@ sera :
* rtry : tous les processus en cours d'exécution, mais un grand nombre de transferts échouent, causant d'autres tentatives (runStateThreshold_retry )
* run : tous les processus sont en cours d'exécution (et en transfert, et pas en retard, et pas lents... état normal.)
* slow : transfert de moins que le minimum d'octets/seconde ( runStateThreshold_slow )
* stby : Mode veille (Standby): tous les processus sont en cours d'exécution, mais les messages sont stockés dans la file d'attente download_retry locale.
* stop : aucun processus n'est en cours d'exécution.

Les colonnes à droite donnent plus d’informations, détaillant le nombre de processus en cours d’exécution à partir du nombre attendu.
Par exemple, 3/3 signifie 3 processus ou instances sont trouvés à partir des 3 attendus.
Expected liste combien de processus devraient être exécutés à partir d'une configuration même si ils sont arrêtés.
Expand Down Expand Up @@ -694,8 +694,8 @@ utilisera le nom de file d'attente ainsi défini.



AMQP QUEUE BINDINGS
-------------------
Liasons AMQP QUEUE
------------------

Une fois qu'on a une fil d'attente, elle doit être liée à un échange (exchange.)
Les utilisateurs ont presque toujours besoin de définir ces options. Une
Expand Down Expand Up @@ -784,8 +784,8 @@ On peut désactiver la liaison de fil d’attente comme cela::



Client-side Filtering
---------------------
Filtrage côté client
--------------------

Nous avons sélectionné nos messages via **exchange**, **subtopic** et **subtopic**.
Le courtier met les messages correspondants dans notre fil d'attente (*queue*).
Expand All @@ -794,8 +794,8 @@ Le composant télécharge ces messages.
Les clients Sarracenia implémentent un filtrage plus flexible côté client
en utilisant les expressions régulières.

Brief Introduction to Regular Expressions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Bref introduction aux expressions régulières
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Les expressions régulières sont un moyen très puissant d'exprimer les correspondances de motifs.
Ils offrent une flexibilité extrême, mais dans ces exemples, nous utiliserons seulement un
Expand Down Expand Up @@ -2043,6 +2043,12 @@ des erreurs. Le back-off peut s’accumuler au point où les nouvelles tentative
ou deux. Une fois que le serveur recommence à répondre normalement, les programmes reviendront à la
vitesse normale de traitement.

Si une panne dure un certain temps, on peut arrêter le flux, configurer *attempts 0* pour remplir la
file d'attente de nouvelles tentatives sans faire de vaines tentatives de téléchargement ou d'envoi. À la fin de la panne,
remettez *attempts* à la normale et la file d'attente de nouvelles tentatives sera progressivement vidée lorsqu'il y aura
de la place dans le flux de données actuel.


EXEMPLES
========

Expand Down
22 changes: 22 additions & 0 deletions docs/source/fr/Reference/sr3_options.7.rst
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,28 @@ des **attempts** (ou d’envoi, pour un sender) va entrainer l’ajout du messag
pour une nouvelle tentative plus tard. Lorsque aucun message d'annonce n’est prêt à être consommé dans la fil d’attente AMQP,
les requêtes se feront avec la fil d’attente de "retry".

Si:

* on sait que les transferts échoueront pendant une longue période, en raison d'une panne ou d'une maintenance
à la destination.

* Vous vous attendez à ce qu'un grand volume de fichiers soit mis en file d'attente pour transfert. Les files d'attente
sur la pompe de données augmenteront donc jusqu'à un point où les administrateurs de la pompe ne seront plus à l'aise.
Notez que : Tous les conseils sur le réglage des performances et de la disponibilité de courtier de messages
demandent aux utilisateurs de minimiser la population des files d'attente sur les courtiers.

* Le répertoire d'état local ( ~/.cache ) est accessible en écriture pendant la période de la panne.

Alors :

On peut définir *attempts* sur 0. Cela entraînera l'écriture des messages mis en file d'attente pour le transfert
dans les files d'attente de *download_retry* locales (écrites dans les répertoires d'état locaux) et déchargera
le courtier.

Lorsque *attempts* est égal à 0, la commande *sr3 status* signalera que le flux est dans l'état
*standby*. Le nombre de files d'attente de nouvelles tentatives augmentera et seuls les messages (pas de données) seront transférés.
Lorsque l'activité de maintenance ou la panne a été résolue.

baseDir <chemin> (défaut: /)
----------------------------

Expand Down
2 changes: 2 additions & 0 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1875,6 +1875,7 @@ def do_download(self) -> None:
parsed_url = sarracenia.baseUrlParse(msg['baseUrl'])
self.scheme = parsed_url.scheme

ok = False
i = 1
while i <= self.o.attempts:

Expand Down Expand Up @@ -2856,6 +2857,7 @@ def do_send(self):

# N attempts to send

ok = False
i = 1
while i <= self.o.attempts:
if i != 1:
Expand Down
12 changes: 10 additions & 2 deletions sarracenia/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1113,11 +1113,16 @@ def _resolve(self):
elif self.states[c][cfg]['metrics']['byteRate'] < self.configs[c][cfg]['options'].runStateThreshold_slow:
flow_status = 'slow'
elif self.states[c][cfg]['metrics']['retry'] > self.configs[c][cfg]['options'].runStateThreshold_retry:
flow_status = 'retry'
if self.configs[c][cfg]['options'].attempts == 0:
flow_status='standby'
else:
flow_status = 'retry'
elif self.states[c][cfg]['metrics']['lagMean'] > self.configs[c][cfg]['options'].runStateThreshold_lag:
flow_status = 'lagging'
elif self.states[c][cfg]['metrics']['rejectPercent'] > self.configs[c][cfg]['options'].runStateThreshold_reject:
flow_status = 'reject'
elif self.configs[c][cfg]['options'].attempts == 0:
flow_status='standby'
elif hasattr(self.configs[c][cfg]['options'],'post_broker') and self.configs[c][cfg]['options'].post_broker \
and (now-self.states[c][cfg]['metrics']['txLast']) > self.configs[c][cfg]['options'].runStateThreshold_idle:
flow_status = 'idle'
Expand All @@ -1130,6 +1135,7 @@ def _resolve(self):
self.states[c][cfg]['metrics']['msgRateCpu'] < self.configs[c][cfg]['options'].runStateThreshold_cpuSlow:
flow_status = 'cpuSlow'
else:

flow_status = 'running'

self.states[c][cfg]['resource_usage'] = copy.deepcopy(resource_usage)
Expand Down Expand Up @@ -1306,7 +1312,7 @@ def __init__(self, opt, config_fnmatches=None):
'sender', 'shovel', 'subscribe', 'watch', 'winnow'
]
# active means >= 1 process exists on the node.
self.status_active = ['cpuSlow', 'hung', 'idle', 'lagging', 'partial', 'reject', 'retry', 'running', 'slow', 'waitVip' ]
self.status_active = ['cpuSlow', 'hung', 'idle', 'lagging', 'partial', 'reject', 'retry', 'running', 'slow', 'standby', 'waitVip' ]
self.status_values = self.status_active + [ 'disabled', 'include', 'missing', 'stopped', 'unknown' ]

self.bin_dir = os.path.dirname(os.path.realpath(__file__))
Expand Down Expand Up @@ -2631,6 +2637,8 @@ def status(self):
cfg_status = "rtry"
if cfg_status == "runn" :
cfg_status = "run"
if cfg_status == "stan" :
cfg_status = "stby"
elif cfg_status == 'wait':
cfg_status = 'wVip'

Expand Down

0 comments on commit b649f0c

Please sign in to comment.