diff --git a/src/ac_training_lab/picow/digital-pipette/time-sync-micro.py b/src/ac_training_lab/picow/digital-pipette/time-sync-micro.py index 8cd30c2..36037fd 100644 --- a/src/ac_training_lab/picow/digital-pipette/time-sync-micro.py +++ b/src/ac_training_lab/picow/digital-pipette/time-sync-micro.py @@ -11,39 +11,40 @@ ensuring consistent time synchronization. """ -import sys +import asyncio import json import ssl -import asyncio +import sys +from time import sleep, time + import ntptime -from uio import StringIO -from time import time, sleep import urequests_2 as urequests -#import csv# for MongoDB Data API - -# WiFi -from netman import connectWiFi # Hardware -from machine import PWM,Pin +from machine import PWM, Pin # MQTT from mqtt_as import MQTTClient, config - from my_secrets import ( - SSID, - PASSWORD, + CLUSTER_NAME, + COLLECTION_NAME, COURSE_ID, + DATA_API_KEY, + DATABASE_NAME, + ENDPOINT_BASE_URL, HIVEMQ_HOST, HIVEMQ_PASSWORD, HIVEMQ_USERNAME, - DATA_API_KEY, - ENDPOINT_BASE_URL, - CLUSTER_NAME, - DATABASE_NAME, - COLLECTION_NAME, + PASSWORD, + SSID, ) +# WiFi +from netman import connectWiFi +from uio import StringIO + +# import csv# for MongoDB Data API + # Description: Receive commands from HiveMQ and send sensor data to HiveMQ @@ -96,10 +97,6 @@ ) - - - - # MQTT Topics command_topic = "time-sync/orchestrator" micro_topic = "time-sync/motor1" @@ -108,16 +105,19 @@ print(f"Command topic: {command_topic}") print(f"Sensor data topic: {micro_topic}") + def set_servo_angle(angle): # Convert the angle to duty cycle min_duty = 1638 # Corresponds to 0 degrees (1 ms pulse width) max_duty = 8192 # Corresponds to 180 degrees (2 ms pulse width) - + # Calculate the duty cycle for the given angle duty = int(min_duty + (max_duty - min_duty) * (angle / 180)) servo.duty_u16(duty) + + async def messages(client): - global received_time # Respond to incoming messages + global received_time # Respond to incoming messages global duty_for_2ms async for topic, msg, retained in client.queue: try: @@ -127,26 +127,40 @@ async def messages(client): print((topic, msg, retained)) if topic == command_topic: - + data = json.loads(msg) time = data["ntp_time"] received_time = "True" sg90.duty_u16(duty_for_2ms) - rtt = (ntptime.time() - time) - adjusted_time = time + rtt #The orchestrators time once the message is received + rtt = ntptime.time() - time + adjusted_time = ( + time + rtt + ) # The orchestrators time once the message is received print(time) current_time = ntptime.time() - drift = adjusted_time - current_time #finding the clock drift (if any) + drift = adjusted_time - current_time # finding the clock drift (if any) print(current_time) - if abs(drift) >= 1:#If the clock drift is significant(greater than 1 second) - ntptime.time(time()+drift)#adjusts the internal clock to account for a drift + if ( + abs(drift) >= 1 + ): # If the clock drift is significant(greater than 1 second) + ntptime.time( + time() + drift + ) # adjusts the internal clock to account for a drift stored_data.append(time) - payload_json = json.dumps({"ntp_time": current_time})#convert current_time to a json string + payload_json = json.dumps( + {"ntp_time": current_time} + ) # convert current_time to a json string try: - await client.publish(micro_topic, payload_json, qos=1, retain=False)#publishes ntp time for the orchestrator to read - print(f"Message published to topic {sensor_data_topic}: {payload_json}") + await client.publish( + micro_topic, payload_json, qos=1, retain=False + ) # publishes ntp time for the orchestrator to read + print( + f"Message published to topic {sensor_data_topic}: {payload_json}" + ) except Exception as e: - print(f"Failed to publish message to topic {sensor_data_topic}: {e}") + print( + f"Failed to publish message to topic {sensor_data_topic}: {e}" + ) except Exception as e: with StringIO() as f: # type: ignore @@ -167,24 +181,23 @@ async def main(client): await client.connect() for coroutine in (up, messages): asyncio.create_task(coroutine(client)) - y = duty_for_2ms - sync_interval = 300 # sync every 5 minutes + sync_interval = 300 # sync every 5 minutes start_time = 0 sg90.duty_u16(duty_for_2ms) asyncio.sleep(1) - y = y-200 + y = y - 200 sg90.duty_u16(y) # must have the while True loop to keep the program running while True: if received_time == "True": if start_time == 0: start_time = time() - + await asyncio.sleep(5) - - duty_for_2ms = duty_for_2ms -100 + + duty_for_2ms = duty_for_2ms - 100 if duty_for_2ms == 0.075 * 65535: duty_for_2ms = 0.08 * 65535 elapsed_time = round(time() - start_time) @@ -192,13 +205,13 @@ async def main(client): if elapsed_time > sync_interval: try: ntptime.settime() - start_time = time()#Reset start time after sync + start_time = time() # Reset start time after sync except Exception as e: print(f"Failed to resyncrhonize time : {e}") else: x = 0 await asyncio.sleep(5) - elapsed_time = x+5 + elapsed_time = x + 5 print(f"Elapsed before start: {elapsed_time}s") @@ -208,8 +221,4 @@ async def main(client): try: asyncio.run(main(client)) finally: - client.close() - - - - + client.close() diff --git a/src/ac_training_lab/picow/digital-pipette/time-sync-orches.py b/src/ac_training_lab/picow/digital-pipette/time-sync-orches.py index 721bffa..8aa926b 100644 --- a/src/ac_training_lab/picow/digital-pipette/time-sync-orches.py +++ b/src/ac_training_lab/picow/digital-pipette/time-sync-orches.py @@ -10,23 +10,18 @@ """ - -import os +import asyncio import json +import os import threading +from queue import Empty, Queue from time import time -import asyncio -import numpy as np -import pandas as pd + import ntplib -from queue import Queue, Empty +import numpy as np import paho.mqtt.client as paho -from my_secrets import ( - HIVEMQ_HOST, - HIVEMQ_PASSWORD, - HIVEMQ_USERNAME -) - +import pandas as pd +from my_secrets import HIVEMQ_HOST, HIVEMQ_PASSWORD, HIVEMQ_USERNAME username = HIVEMQ_USERNAME password = HIVEMQ_PASSWORD @@ -34,11 +29,12 @@ published_time = 0 received_time = 0 -#Returns the current time of the npt servers -def get_ntp_time(server='time.google.com'): + +# Returns the current time of the npt servers +def get_ntp_time(server="time.google.com"): try: print("Creating NTP client") - client=ntplib.NTPClient() + client = ntplib.NTPClient() print(f"Requesting time from server: {server}") response = client.request(server) print("Received response from server") @@ -48,25 +44,27 @@ def get_ntp_time(server='time.google.com'): except Exception as e: print("failed to connect to server") return None - -publish_topic = 'time-sync/orchestrator' -motor1_topic = 'time-sync/motor1' -motor2_topic = 'time-sync/motor2' + + +publish_topic = "time-sync/orchestrator" +motor1_topic = "time-sync/motor1" +motor2_topic = "time-sync/motor2" subscribe_topics = [motor1_topic, motor2_topic] + def get_client_and_queue( subscribe_topic, host, username, password, port=8883, tls=True ): - ''' + """ Set up the Client and queue as well as receive messages from the motors - + Parameters - + ---------- subscribe_topic : list A list of the motor topics that the orchestrator receives messages from - host: - + host: + host : str The hostname or IP address of the MQTT server to connect to. username : str @@ -78,7 +76,7 @@ def get_client_and_queue( tls : bool, optional Whether to use TLS for the connection, by default True. - ''' + """ client = paho.Client() # create new instance queue = Queue() # Create queue to store sensor data connected_event = threading.Event() # event to wait for connection @@ -93,13 +91,17 @@ def on_message(client, userdata, msg): elif "received_time" in data: print("skip") else: - raise KeyError (f"Neither 'ntp_time' nor 'received_time' found in the message : {data}") - + raise KeyError( + f"Neither 'ntp_time' nor 'received_time' found in the message : {data}" + ) + except KeyError as e: print(f"Error: {e}") queue.put(data) - print(f"Queue contents after receiving message: {[item for item in queue.queue]}") - #Finds the difference between the published time and the time recieved to find the difference + print( + f"Queue contents after receiving message: {[item for item in queue.queue]}" + ) + # Finds the difference between the published time and the time recieved to find the difference diff = published_time - received_time print("PUBLISHED TIME") print(published_time) @@ -108,15 +110,16 @@ def on_message(client, userdata, msg): print(diff) def on_connect(client, userdata, flags, rc): - #Subscribes to all topics in the list - for i in range (len(subscribe_topics)): + # Subscribes to all topics in the list + for i in range(len(subscribe_topics)): client.subscribe(subscribe_topics[i], qos=1) connected_event.set() + client.on_connect = on_connect client.on_message = on_message # enable TLS for secure connection if tls: - client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS_CLIENT) + client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS_CLIENT) # set username and password client.username_pw_set(username, password) # connect to HiveMQ Cloud on port 8883 @@ -125,6 +128,7 @@ def on_connect(client, userdata, flags, rc): connected_event.wait(timeout=10.0) return client, queue + async def publish_ntp_time(client, topic): """ Publish the current NTP time to the specified MQTT topic. @@ -140,8 +144,10 @@ async def publish_ntp_time(client, topic): current_time = get_ntp_time() # Get the current time published_time = round(current_time) payload = json.dumps({"ntp_time": current_time}) - client.publish(topic, payload, qos=1, retain = False) + client.publish(topic, payload, qos=1, retain=False) print(f"Published NTP time: {current_time} to topic: {topic}") + + async def publish_rec_time(client, topic): """ Publish the NTP time received by the motors to the specified MQTT topic. @@ -154,10 +160,12 @@ async def publish_rec_time(client, topic): The MQTT topic to publish the NTP time to. """ global received_time - payload = json.dumps({"received_time":received_time}) - client.publish(topic, payload, qos=1, retain = False) + payload = json.dumps({"received_time": received_time}) + client.publish(topic, payload, qos=1, retain=False) + + async def main(): - client, queue = get_client_and_queue(subscribe_topics,host,username, password) + client, queue = get_client_and_queue(subscribe_topics, host, username, password) client.loop_start() print("started") @@ -166,7 +174,7 @@ async def main(): while True: await asyncio.sleep(5) elapsed_time = round(time() - start_time) - await publish_ntp_time(client,publish_topic) + await publish_ntp_time(client, publish_topic) await publish_rec_time(client, publish_topic) print(f"Elapsed: {elapsed_time}s") if elapsed_time >= 600: @@ -174,7 +182,8 @@ async def main(): print("stopped") client.loop_stop() client.disconnect() + + if __name__ == "__main__": asyncio.run(main()) -#need a method to indicate if it is time synced - +# need a method to indicate if it is time synced