Skip to content

Commit

Permalink
updated the connector code
Browse files Browse the repository at this point in the history
  • Loading branch information
manjutapali committed Apr 1, 2024
1 parent 80eb642 commit 6785efd
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import grpc
from concurrent import futures
import requests
import json
import google.protobuf.timestamp_pb2 as timestamp_pb2
import datetime


Expand Down Expand Up @@ -32,20 +30,25 @@ def Test(self, request, context):
def Schema(self, request, context):
table_list = common__pb2.TableList()
t1 = table_list.tables.add()
t1.name = "london_subway"
t1.name = "table1"
c1 = t1.columns.add()
c1.name = "id"
c1.name = "a1"
c1.type = common__pb2.DataType.STRING
c1.primary_key = True
c2 = t1.columns.add()
c2.name = "a2"
c2.type = common__pb2.DataType.DOUBLE

t2 = table_list.tables.add()
t2.name = "table2"
c1 = t2.columns.add()
c1.name = "b1"
c1.type = common__pb2.DataType.STRING
c1.primary_key = True
c2 = t1.columns.add()
c2.name = "linestatus"
c2.type = common__pb2.DataType.STRING
c2.name = "b2"
c2.type = common__pb2.DataType.UNSPECIFIED

c3 = t1.columns.add()
c3.name = "timestamp"
c3.type = common__pb2.DataType.UTC_DATETIME
c3.primary_key = True

response = connector__sdk__pb2.SchemaResponse(without_schema=table_list)
return response
Expand All @@ -56,40 +59,71 @@ def Update(self, request, context):
if request.HasField('state_json'):
state_json = request.state_json

# Read london subway API response and update the table with line status
result = requests.get('https://api.tfl.gov.uk/line/mode/tube/status',
headers={"content-type": "application/json", "charset": "utf-8"})

# # Read london subway API response and update the table with line status
# result = requests.get('https://api.tfl.gov.uk/line/mode/tube/status',
# headers={"content-type": "application/json", "charset": "utf-8"})
#
#
# timeline = json.loads(result.text)

timeline = json.loads(result.text)

for t in timeline:
# -- Send UPSERT records
for t in range(0, 3):
response = connector__sdk__pb2.UpdateResponse()
operation = response.operation
val1 = common__pb2.ValueType()
val1.string = t["id"]
val1.string = "a-" + str(t)

val2 = common__pb2.ValueType()
val2.string = t["lineStatuses"][0]["statusSeverityDescription"]

val3 = common__pb2.ValueType()
time_in_sec = convert_string_to_utc_seconds(t["created"])
utc_datetime = timestamp_pb2.Timestamp(seconds=time_in_sec)
val3.utc_datetime.CopyFrom(utc_datetime)
val2.double = t * 0.234

print("Values are assigned", val1.string)

record = connector__sdk__pb2.Record()
record.type = common__pb2.OpType.UPSERT
record.table_name="london_subway"
record.data["id"].CopyFrom(val1)
record.data["linestatus"].CopyFrom(val2)
record.data["timestamp"].CopyFrom(val3)

record.table_name="table1"
record.data["a1"].CopyFrom(val1)
record.data["a2"].CopyFrom(val2)

operation.record.CopyFrom(record)
yield connector__sdk__pb2.UpdateResponse(operation=operation)

print("Upserted records")

# -- Send UPDATE record
response = connector__sdk__pb2.UpdateResponse()
operation = response.operation
val1 = common__pb2.ValueType()
val1.string = "a-0"

val2 = common__pb2.ValueType()
val2.double = 110.234

record = connector__sdk__pb2.Record()
record.type = common__pb2.OpType.UPDATE
record.table_name = "table1"
record.data["a1"].CopyFrom(val1)
record.data["a2"].CopyFrom(val2)

operation.record.CopyFrom(record)
yield connector__sdk__pb2.UpdateResponse(operation=operation)

print("Updated record")

# -- Send DELETE record
response = connector__sdk__pb2.UpdateResponse()
operation = response.operation
val1 = common__pb2.ValueType()
val1.string = "a-2"

record = connector__sdk__pb2.Record()
record.type = common__pb2.OpType.DELETE
record.table_name = "table1"
record.data["a1"].CopyFrom(val1)

operation.record.CopyFrom(record)
yield connector__sdk__pb2.UpdateResponse(operation=operation)
print("deleted the record")



def convert_string_to_utc_seconds(datetime_str):
Expand Down
File renamed without changes.

0 comments on commit 6785efd

Please sign in to comment.