Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

precommit #43

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 55 additions & 46 deletions src/ac_training_lab/picow/digital-pipette/time-sync-micro.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,40 @@
ensuring consistent time synchronization.
"""

import sys
import asyncio
import json
import ssl
import asyncio
import sys
from time import sleep, time

import ntptime
from uio import StringIO
from time import time, sleep
import urequests_2 as urequests
#import csv# for MongoDB Data API

# WiFi
from netman import connectWiFi

# Hardware
from machine import PWM,Pin
from machine import PWM, Pin

# MQTT
from mqtt_as import MQTTClient, config

from my_secrets import (
SSID,
PASSWORD,
CLUSTER_NAME,
COLLECTION_NAME,
COURSE_ID,
DATA_API_KEY,
DATABASE_NAME,
ENDPOINT_BASE_URL,
HIVEMQ_HOST,
HIVEMQ_PASSWORD,
HIVEMQ_USERNAME,
DATA_API_KEY,
ENDPOINT_BASE_URL,
CLUSTER_NAME,
DATABASE_NAME,
COLLECTION_NAME,
PASSWORD,
SSID,
)

# WiFi
from netman import connectWiFi
from uio import StringIO

# import csv# for MongoDB Data API


# Description: Receive commands from HiveMQ and send sensor data to HiveMQ

Expand Down Expand Up @@ -96,10 +97,6 @@
)






# MQTT Topics
command_topic = "time-sync/orchestrator"
micro_topic = "time-sync/motor1"
Expand All @@ -108,16 +105,19 @@
print(f"Command topic: {command_topic}")
print(f"Sensor data topic: {micro_topic}")


def set_servo_angle(angle):
# Convert the angle to duty cycle
min_duty = 1638 # Corresponds to 0 degrees (1 ms pulse width)
max_duty = 8192 # Corresponds to 180 degrees (2 ms pulse width)

# Calculate the duty cycle for the given angle
duty = int(min_duty + (max_duty - min_duty) * (angle / 180))
servo.duty_u16(duty)


async def messages(client):
global received_time # Respond to incoming messages
global received_time # Respond to incoming messages
global duty_for_2ms
async for topic, msg, retained in client.queue:
try:
Expand All @@ -127,26 +127,40 @@ async def messages(client):
print((topic, msg, retained))

if topic == command_topic:

data = json.loads(msg)
time = data["ntp_time"]
received_time = "True"
sg90.duty_u16(duty_for_2ms)
rtt = (ntptime.time() - time)
adjusted_time = time + rtt #The orchestrators time once the message is received
rtt = ntptime.time() - time
adjusted_time = (
time + rtt
) # The orchestrators time once the message is received
print(time)
current_time = ntptime.time()
drift = adjusted_time - current_time #finding the clock drift (if any)
drift = adjusted_time - current_time # finding the clock drift (if any)
print(current_time)
if abs(drift) >= 1:#If the clock drift is significant(greater than 1 second)
ntptime.time(time()+drift)#adjusts the internal clock to account for a drift
if (
abs(drift) >= 1
): # If the clock drift is significant(greater than 1 second)
ntptime.time(
time() + drift
) # adjusts the internal clock to account for a drift
stored_data.append(time)
payload_json = json.dumps({"ntp_time": current_time})#convert current_time to a json string
payload_json = json.dumps(
{"ntp_time": current_time}
) # convert current_time to a json string
try:
await client.publish(micro_topic, payload_json, qos=1, retain=False)#publishes ntp time for the orchestrator to read
print(f"Message published to topic {sensor_data_topic}: {payload_json}")
await client.publish(
micro_topic, payload_json, qos=1, retain=False
) # publishes ntp time for the orchestrator to read
print(
f"Message published to topic {sensor_data_topic}: {payload_json}"
)
except Exception as e:
print(f"Failed to publish message to topic {sensor_data_topic}: {e}")
print(
f"Failed to publish message to topic {sensor_data_topic}: {e}"
)

except Exception as e:
with StringIO() as f: # type: ignore
Expand All @@ -167,38 +181,37 @@ async def main(client):
await client.connect()
for coroutine in (up, messages):
asyncio.create_task(coroutine(client))


y = duty_for_2ms
sync_interval = 300 # sync every 5 minutes
sync_interval = 300 # sync every 5 minutes
start_time = 0
sg90.duty_u16(duty_for_2ms)
asyncio.sleep(1)
y = y-200
y = y - 200
sg90.duty_u16(y)
# must have the while True loop to keep the program running
while True:
if received_time == "True":
if start_time == 0:
start_time = time()

await asyncio.sleep(5)
duty_for_2ms = duty_for_2ms -100

duty_for_2ms = duty_for_2ms - 100
if duty_for_2ms == 0.075 * 65535:
duty_for_2ms = 0.08 * 65535
elapsed_time = round(time() - start_time)
print(f"Elapsed: {elapsed_time}s")
if elapsed_time > sync_interval:
try:
ntptime.settime()
start_time = time()#Reset start time after sync
start_time = time() # Reset start time after sync
except Exception as e:
print(f"Failed to resyncrhonize time : {e}")
else:
x = 0
await asyncio.sleep(5)
elapsed_time = x+5
elapsed_time = x + 5
print(f"Elapsed before start: {elapsed_time}s")


Expand All @@ -208,8 +221,4 @@ async def main(client):
try:
asyncio.run(main(client))
finally:
client.close()




client.close()
87 changes: 48 additions & 39 deletions src/ac_training_lab/picow/digital-pipette/time-sync-orches.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,31 @@

"""


import os
import asyncio
import json
import os
import threading
from queue import Empty, Queue
from time import time
import asyncio
import numpy as np
import pandas as pd

import ntplib
from queue import Queue, Empty
import numpy as np
import paho.mqtt.client as paho
from my_secrets import (
HIVEMQ_HOST,
HIVEMQ_PASSWORD,
HIVEMQ_USERNAME
)

import pandas as pd
from my_secrets import HIVEMQ_HOST, HIVEMQ_PASSWORD, HIVEMQ_USERNAME

username = HIVEMQ_USERNAME
password = HIVEMQ_PASSWORD
host = HIVEMQ_HOST
published_time = 0
received_time = 0

#Returns the current time of the npt servers
def get_ntp_time(server='time.google.com'):

# Returns the current time of the npt servers
def get_ntp_time(server="time.google.com"):
try:
print("Creating NTP client")
client=ntplib.NTPClient()
client = ntplib.NTPClient()
print(f"Requesting time from server: {server}")
response = client.request(server)
print("Received response from server")
Expand All @@ -48,25 +44,27 @@ def get_ntp_time(server='time.google.com'):
except Exception as e:
print("failed to connect to server")
return None

publish_topic = 'time-sync/orchestrator'
motor1_topic = 'time-sync/motor1'
motor2_topic = 'time-sync/motor2'


publish_topic = "time-sync/orchestrator"
motor1_topic = "time-sync/motor1"
motor2_topic = "time-sync/motor2"
subscribe_topics = [motor1_topic, motor2_topic]


def get_client_and_queue(
subscribe_topic, host, username, password, port=8883, tls=True
):
'''
"""
Set up the Client and queue as well as receive messages from the motors

Parameters

----------
subscribe_topic : list
A list of the motor topics that the orchestrator receives messages from
host:
host:

host : str
The hostname or IP address of the MQTT server to connect to.
username : str
Expand All @@ -78,7 +76,7 @@ def get_client_and_queue(
tls : bool, optional
Whether to use TLS for the connection, by default True.

'''
"""
client = paho.Client() # create new instance
queue = Queue() # Create queue to store sensor data
connected_event = threading.Event() # event to wait for connection
Expand All @@ -93,13 +91,17 @@ def on_message(client, userdata, msg):
elif "received_time" in data:
print("skip")
else:
raise KeyError (f"Neither 'ntp_time' nor 'received_time' found in the message : {data}")

raise KeyError(
f"Neither 'ntp_time' nor 'received_time' found in the message : {data}"
)

except KeyError as e:
print(f"Error: {e}")
queue.put(data)
print(f"Queue contents after receiving message: {[item for item in queue.queue]}")
#Finds the difference between the published time and the time recieved to find the difference
print(
f"Queue contents after receiving message: {[item for item in queue.queue]}"
)
# Finds the difference between the published time and the time recieved to find the difference
diff = published_time - received_time
print("PUBLISHED TIME")
print(published_time)
Expand All @@ -108,15 +110,16 @@ def on_message(client, userdata, msg):
print(diff)

def on_connect(client, userdata, flags, rc):
#Subscribes to all topics in the list
for i in range (len(subscribe_topics)):
# Subscribes to all topics in the list
for i in range(len(subscribe_topics)):
client.subscribe(subscribe_topics[i], qos=1)
connected_event.set()

client.on_connect = on_connect
client.on_message = on_message
# enable TLS for secure connection
if tls:
client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS_CLIENT)
client.tls_set(tls_version=paho.ssl.PROTOCOL_TLS_CLIENT)
# set username and password
client.username_pw_set(username, password)
# connect to HiveMQ Cloud on port 8883
Expand All @@ -125,6 +128,7 @@ def on_connect(client, userdata, flags, rc):
connected_event.wait(timeout=10.0)
return client, queue


async def publish_ntp_time(client, topic):
"""
Publish the current NTP time to the specified MQTT topic.
Expand All @@ -140,8 +144,10 @@ async def publish_ntp_time(client, topic):
current_time = get_ntp_time() # Get the current time
published_time = round(current_time)
payload = json.dumps({"ntp_time": current_time})
client.publish(topic, payload, qos=1, retain = False)
client.publish(topic, payload, qos=1, retain=False)
print(f"Published NTP time: {current_time} to topic: {topic}")


async def publish_rec_time(client, topic):
"""
Publish the NTP time received by the motors to the specified MQTT topic.
Expand All @@ -154,10 +160,12 @@ async def publish_rec_time(client, topic):
The MQTT topic to publish the NTP time to.
"""
global received_time
payload = json.dumps({"received_time":received_time})
client.publish(topic, payload, qos=1, retain = False)
payload = json.dumps({"received_time": received_time})
client.publish(topic, payload, qos=1, retain=False)


async def main():
client, queue = get_client_and_queue(subscribe_topics,host,username, password)
client, queue = get_client_and_queue(subscribe_topics, host, username, password)
client.loop_start()
print("started")

Expand All @@ -166,15 +174,16 @@ async def main():
while True:
await asyncio.sleep(5)
elapsed_time = round(time() - start_time)
await publish_ntp_time(client,publish_topic)
await publish_ntp_time(client, publish_topic)
await publish_rec_time(client, publish_topic)
print(f"Elapsed: {elapsed_time}s")
if elapsed_time >= 600:
break
print("stopped")
client.loop_stop()
client.disconnect()


if __name__ == "__main__":
asyncio.run(main())
#need a method to indicate if it is time synced

# need a method to indicate if it is time synced
Loading