Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Logging): Add examples for providing logs #74

Merged
merged 12 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion examples/connector/golang/golang_connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
state := MyState{}
json.Unmarshal([]byte(state_json), &state)

log.Println("config: ", config, "selection: ", selection, "state_json: ", state_json, "mystate: ", state)
message := fmt.Sprintf("config: %s, selection: %s, state_json: %s, mystate: %s", config, selection, state_json, state)
fivetran-satvikpatil marked this conversation as resolved.
Show resolved Hide resolved
LogMessage("INFO", message)

// -- Send a log message
stream.Send(&pb.UpdateResponse{
Expand Down Expand Up @@ -66,6 +67,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
state.Cursor++
}

LogMessage("INFO", "Completed sending upsert records")

// -- Send UPDATE record
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Expand All @@ -86,6 +89,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
})
state.Cursor++

LogMessage("INFO", "Completed sending update records")

// -- Send DELETE record
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Expand All @@ -105,6 +110,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
})
state.Cursor++

LogMessage("INFO", "Completed sending delete records")

// Serialize state from struct to JSON string
new_state_json, _ := json.Marshal(state)
new_state := string(new_state_json)
Expand Down Expand Up @@ -133,6 +140,7 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
},
})

LogMessage("SEVERE", "Sample severe message: Update call completed")
// End the RPC call
return nil
}
Expand Down Expand Up @@ -258,6 +266,16 @@ func (s *server) Test(ctx context.Context, in *pb.TestRequest) (*pb.TestResponse
}, nil
}

func LogMessage(level string, message string) {
fivetran-rishabhghosh marked this conversation as resolved.
Show resolved Hide resolved
log := map[string]interface{}{
"level": level,
"message": message,
"message-origin": "sdk_connector",
}
logJSON, _ := json.Marshal(log)
fmt.Println(string(logJSON))
}

func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
import java.util.*;

public class ConnectorServiceImpl extends ConnectorGrpc.ConnectorImplBase {
private final String INFO = "INFO";
private final String WARNING = "WARNING";
private final String SEVERE = "SEVERE";
@Override
public void configurationForm(ConfigurationFormRequest request, StreamObserver<ConfigurationFormResponse> responseObserver) {
print(INFO, "Started fetching configuration form");
responseObserver.onNext(
ConfigurationFormResponse.newBuilder()
.setSchemaSelectionSupported(true)
Expand Down Expand Up @@ -41,23 +45,28 @@ public void configurationForm(ConfigurationFormRequest request, StreamObserver<C
ConfigurationTest.newBuilder().setName("select").setLabel("Tests selection").build()))
.build());

print(INFO, "Fetching configuration form completed");
responseObserver.onCompleted();
}

@Override
public void test(TestRequest request, StreamObserver<TestResponse> responseObserver) {

Map<String, String> configuration = request.getConfigurationMap();

// Name of the test to be run
String testName = request.getName();
System.out.println("test name: " + testName);
String message = String.format("test name: %s", testName);
print(INFO, message);

responseObserver.onNext(TestResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}

@Override
public void schema(SchemaRequest request, StreamObserver<SchemaResponse> responseObserver) {

print(WARNING, "Sample warning message while fetching schema");
Map<String, String> configuration = request.getConfigurationMap();

TableList tableList = TableList.newBuilder()
Expand Down Expand Up @@ -177,10 +186,16 @@ public void update(UpdateRequest request, StreamObserver<UpdateResponse> respons
.build())
.build());
} catch (JsonProcessingException e) {
String message = e.getMessage();
print(SEVERE, message);
responseObserver.onError(e);
}

// End the streaming RPC call
responseObserver.onCompleted();
}

private void print(String level, String message){
System.out.println(String.format("{\"level\":\"%s\", \"message\": \"%s\", \"message-origin\": \"sdk_connector\"}", level, message));
}
}
21 changes: 9 additions & 12 deletions examples/connector/nodejs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const connectorSdkProto = protoDescriptor.fivetran_sdk;
const server = new grpc.Server();

const configurationForm = (call, callback) => {
logMessage("INFO", "Fetching configuration form")
fivetran-satvikpatil marked this conversation as resolved.
Show resolved Hide resolved
callback(null, {
schema_selection_supported: true,
table_selection_supported: true,
Expand All @@ -39,12 +40,13 @@ const configurationForm = (call, callback) => {
const test = (call, callback) => {
const configuration = call.request.configuration;
const testName = call.request.name;
console.log(`Test name: ${testName}`);
logMessage("INFO", `Test name: ${testName}`)
callback(null, { success: true });
};

// Implement the Schema RPC method
const schema = (call, callback) => {
logMessage("INFO", "Fetching the schema from the implemented method")
const tableList = {
tables: [
{
Expand Down Expand Up @@ -77,15 +79,6 @@ const configurationForm = (call, callback) => {
call.write(response);
};

const sendLogEntry = (message) => {
sendResponse({
log_entry: {
level: "INFO",
message: message
}
});
};

const sendOperation = (operation) => {
sendResponse({
operation: operation
Expand All @@ -94,7 +87,7 @@ const configurationForm = (call, callback) => {

try {
// Send a log message
sendLogEntry("Sync STARTING");
logMessage("INFO", "Sync STARTING");

// Send UPSERT records
for (let i = 0; i < 3; i++) {
Expand Down Expand Up @@ -145,7 +138,7 @@ const configurationForm = (call, callback) => {
});

// Send a log message
sendLogEntry("Sync DONE");
logMessage("SEVERE", "Sample severe message: Sync done")

} catch (error) {
callback(error);
Expand All @@ -155,6 +148,10 @@ const configurationForm = (call, callback) => {
call.end();
};

function logMessage(level, message) {
console.log(`{"level":"${level}", "message": "${message}", "message-origin": "sdk_connector"}`);
}


server.addService(connectorSdkProto.Connector.service, {configurationForm, test, schema, update})

Expand Down
16 changes: 13 additions & 3 deletions examples/connector/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

class ConnectorService(connector_sdk_pb2_grpc.ConnectorServicer):
def ConfigurationForm(self, request, context):
print_log("INFO", "Fetching configuration form")
form_fields = common_pb2.ConfigurationFormResponse(schema_selection_supported=True,
table_selection_supported=True)
form_fields.fields.add(name="apiKey", label="API Key", required=True, text_field=common_pb2.TextField.PlainText)
Expand All @@ -35,8 +36,9 @@ def Test(self, request, context):
configuration = request.configuration
# Name of the test to be run
test_name = request.name
print("Configuration: ", configuration)
print("Test name: ", test_name)

print_log("INFO", "Test Name: " + str(test_name))

return common_pb2.TestResponse(success=True)

def Schema(self, request, context):
Expand Down Expand Up @@ -114,6 +116,8 @@ def Update(self, request, context):
yield connector_sdk_pb2.UpdateResponse(operation=operation)
state["cursor"] += 1

print_log("WARNING", "Completed sending update records")

# -- Send DELETE record
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
Expand All @@ -139,6 +143,12 @@ def Update(self, request, context):
log.message = "Sync Done"
yield connector_sdk_pb2.UpdateResponse(log_entry=log)

print_log("SEVERE", "Sending severe log: Completed Update method")


def print_log(level, message):
print(f'{{"level":"{level}", "message": "{message}", "message-origin": "sdk_connector"}}')


def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
Expand All @@ -152,4 +162,4 @@ def start_server():

if __name__ == '__main__':
print("Starting the server...")
start_server()
start_server()
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
public class DestinationServiceImpl extends DestinationGrpc.DestinationImplBase {
@Override
public void configurationForm(ConfigurationFormRequest request, StreamObserver<ConfigurationFormResponse> responseObserver) {
print("INFO", "Fetching configuration form");
responseObserver.onNext(
ConfigurationFormResponse.newBuilder()
.setSchemaSelectionSupported(true)
Expand Down Expand Up @@ -45,7 +46,8 @@ public void configurationForm(ConfigurationFormRequest request, StreamObserver<C
public void test(TestRequest request, StreamObserver<TestResponse> responseObserver) {
Map<String, String> configuration = request.getConfigurationMap();
String testName = request.getName();
System.out.println("test name: " + testName);
String message = String.format("Test Name: %s", testName);
print("INFO", message);

responseObserver.onNext(TestResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
Expand All @@ -66,15 +68,17 @@ public void describeTable(DescribeTableRequest request, StreamObserver<DescribeT
).build()).build();

responseObserver.onNext(response);
print("SEVERE", "Sample Severe log: Completed describe Table method");
responseObserver.onCompleted();
}

@Override
public void createTable(CreateTableRequest request, StreamObserver<CreateTableResponse> responseObserver) {
Map<String, String> configuration = request.getConfigurationMap();

System.out.println("[CreateTable]: "
+ request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList());
String message = "[CreateTable]: "
+ request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList();
print("INFO", message);
responseObserver.onNext(CreateTableResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}
Expand All @@ -83,8 +87,9 @@ public void createTable(CreateTableRequest request, StreamObserver<CreateTableRe
public void alterTable(AlterTableRequest request, StreamObserver<AlterTableResponse> responseObserver) {
Map<String, String> configuration = request.getConfigurationMap();

System.out.println("[AlterTable]: " +
request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList());
String message = "[AlterTable]: " +
request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList();
print("INFO", message);
responseObserver.onNext(AlterTableResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}
Expand Down Expand Up @@ -112,4 +117,8 @@ public void writeBatch(WriteBatchRequest request, StreamObserver<WriteBatchRespo
responseObserver.onNext(WriteBatchResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}

private void print(String level, String message){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: rename logMessage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it

System.out.println(String.format("{\"level\":\"%s\", \"message\": \"%s\", \"message-origin\": \"sdk_destination\"}", level, message));
}
}
10 changes: 6 additions & 4 deletions examples/destination/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class DestinationImpl(destination_sdk_pb2_grpc.DestinationServicer):
def ConfigurationForm(self, request, context):

print_log("INFO", "Fetching Configuraiton form")
host = common_pb2.FormField(name="host", label="Host", required=True,
text_field=common_pb2.TextField.PlainText)
password = common_pb2.FormField(name="password", label="Password", required=True,
Expand All @@ -35,7 +35,7 @@ def ConfigurationForm(self, request, context):

def Test(self, request, context):
test_name = request.name
print("test name: " + test_name)
print_log("INFO", "test name: " + test_name)
return common_pb2.TestResponse(success=True)

def CreateTable(self, request, context):
Expand All @@ -60,13 +60,13 @@ def WriteBatch(self, request, context):
for delete_file in request.delete_files:
print("replace files: " + str(delete_file))

print("Data loading started for table " + request.table.name)
print_log("INFO", "Data loading started for table " + request.table.name)
for key, value in request.keys.items():
print("----------------------------------------------------------------------------")
print("Decrypting and printing file :" + str(key))
print("----------------------------------------------------------------------------")
read_csv.decrypt_file(key, value)
print("\nData loading completed for table " + request.table.name + "\n")
print_log("INFO", "\nData loading completed for table " + request.table.name + "\n")

res: destination_sdk_pb2.WriteBatchResponse = destination_sdk_pb2.WriteBatchResponse(success=True)
return res
Expand All @@ -77,6 +77,8 @@ def DescribeTable(self, request, context):
table: common_pb2.Table = common_pb2.Table(name=request.table_name, columns=[column1, column2])
return destination_sdk_pb2.DescribeTableResponse(not_found=False, table=table)

def print_log(level, message):
fivetran-satvikpatil marked this conversation as resolved.
Show resolved Hide resolved
print(f'{{"level":"{level}", "message": "{message}", "message-origin": "sdk_destination"}}')

if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
Expand Down
Loading