Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Logging): Add examples for providing logs #74

Merged
merged 12 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion examples/connector/golang/golang_connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"google.golang.org/grpc"
)

const INFO = "INFO"
const WARNING = "WARNING"
const SEVERE = "SEVERE"

var port = flag.Int("port", 50051, "The server port")

type MyState struct {
Expand All @@ -30,7 +34,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
state := MyState{}
json.Unmarshal([]byte(state_json), &state)

log.Println("config: ", config, "selection: ", selection, "state_json: ", state_json, "mystate: ", state)
message := fmt.Sprintf("config: %s, selection: %s, state_json: %s, mystate: %s", config, selection, state_json, state)
fivetran-satvikpatil marked this conversation as resolved.
Show resolved Hide resolved
LogMessage(INFO, message)

// -- Send a log message
stream.Send(&pb.UpdateResponse{
Expand Down Expand Up @@ -66,6 +71,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
state.Cursor++
}

LogMessage(INFO, "Completed sending upsert records")

// -- Send UPDATE record
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Expand All @@ -86,6 +93,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
})
state.Cursor++

LogMessage(INFO, "Completed sending update records")

// -- Send DELETE record
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Expand All @@ -105,6 +114,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
})
state.Cursor++

LogMessage(WARNING, "Sample warning message: Completed sending delete records")

// Serialize state from struct to JSON string
new_state_json, _ := json.Marshal(state)
new_state := string(new_state_json)
Expand Down Expand Up @@ -133,6 +144,7 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
},
})

LogMessage(SEVERE, "Sample severe message: Update call completed")
// End the RPC call
return nil
}
Expand Down Expand Up @@ -258,6 +270,16 @@ func (s *server) Test(ctx context.Context, in *pb.TestRequest) (*pb.TestResponse
}, nil
}

func LogMessage(level string, message string) {
fivetran-rishabhghosh marked this conversation as resolved.
Show resolved Hide resolved
log := map[string]interface{}{
"level": level,
"message": message,
"message-origin": "sdk_connector",
}
logJSON, _ := json.Marshal(log)
fmt.Println(string(logJSON))
}

func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
import java.util.*;

public class ConnectorServiceImpl extends ConnectorGrpc.ConnectorImplBase {
private final String INFO = "INFO";
private final String WARNING = "WARNING";
private final String SEVERE = "SEVERE";
@Override
public void configurationForm(ConfigurationFormRequest request, StreamObserver<ConfigurationFormResponse> responseObserver) {
logMessage(INFO, "Started fetching configuration form");
responseObserver.onNext(
ConfigurationFormResponse.newBuilder()
.setSchemaSelectionSupported(true)
Expand Down Expand Up @@ -41,23 +45,28 @@ public void configurationForm(ConfigurationFormRequest request, StreamObserver<C
ConfigurationTest.newBuilder().setName("select").setLabel("Tests selection").build()))
.build());

logMessage(INFO, "Fetching configuration form completed");
responseObserver.onCompleted();
}

@Override
public void test(TestRequest request, StreamObserver<TestResponse> responseObserver) {

Map<String, String> configuration = request.getConfigurationMap();

// Name of the test to be run
String testName = request.getName();
System.out.println("test name: " + testName);
String message = String.format("test name: %s", testName);
logMessage(INFO, message);

responseObserver.onNext(TestResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}

@Override
public void schema(SchemaRequest request, StreamObserver<SchemaResponse> responseObserver) {

logMessage(WARNING, "Sample warning message while fetching schema");
Map<String, String> configuration = request.getConfigurationMap();

TableList tableList = TableList.newBuilder()
Expand Down Expand Up @@ -177,10 +186,16 @@ public void update(UpdateRequest request, StreamObserver<UpdateResponse> respons
.build())
.build());
} catch (JsonProcessingException e) {
String message = e.getMessage();
logMessage(SEVERE, message);
responseObserver.onError(e);
}

// End the streaming RPC call
responseObserver.onCompleted();
}

private void logMessage(String level, String message) {
System.out.println(String.format("{\"level\":\"%s\", \"message\": \"%s\", \"message-origin\": \"sdk_connector\"}", level, message));
}
}
25 changes: 13 additions & 12 deletions examples/connector/nodejs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ const options = {
};
var packageDefinitionConnector = protoLoader.loadSync(PROTO_PATH_CONNECTOR, options);

const INFO = "INFO";
const WARNING = "WARNING";
const SEVERE = "SEVERE";

const protoDescriptor = grpc.loadPackageDefinition(packageDefinitionConnector);

const connectorSdkProto = protoDescriptor.fivetran_sdk;

const server = new grpc.Server();

const configurationForm = (call, callback) => {
logMessage(INFO, "Fetching configuration form")
callback(null, {
schema_selection_supported: true,
table_selection_supported: true,
Expand All @@ -39,12 +44,13 @@ const configurationForm = (call, callback) => {
const test = (call, callback) => {
const configuration = call.request.configuration;
const testName = call.request.name;
console.log(`Test name: ${testName}`);
logMessage(INFO, `Test name: ${testName}`)
callback(null, { success: true });
};

// Implement the Schema RPC method
const schema = (call, callback) => {
logMessage(INFO, "Fetching the schema from the implemented method")
const tableList = {
tables: [
{
Expand Down Expand Up @@ -77,15 +83,6 @@ const configurationForm = (call, callback) => {
call.write(response);
};

const sendLogEntry = (message) => {
sendResponse({
log_entry: {
level: "INFO",
message: message
}
});
};

const sendOperation = (operation) => {
sendResponse({
operation: operation
Expand All @@ -94,7 +91,7 @@ const configurationForm = (call, callback) => {

try {
// Send a log message
sendLogEntry("Sync STARTING");
logMessage(WARNING, "Sample Warning message: Sync STARTING");

// Send UPSERT records
for (let i = 0; i < 3; i++) {
Expand Down Expand Up @@ -145,7 +142,7 @@ const configurationForm = (call, callback) => {
});

// Send a log message
sendLogEntry("Sync DONE");
logMessage(SEVERE, "Sample severe message: Sync done")

} catch (error) {
callback(error);
Expand All @@ -155,6 +152,10 @@ const configurationForm = (call, callback) => {
call.end();
};

function logMessage(level, message) {
console.log(`{"level":"${level}", "message": "${message}", "message-origin": "sdk_connector"}`);
}


server.addService(connectorSdkProto.Connector.service, {configurationForm, test, schema, update})

Expand Down
19 changes: 16 additions & 3 deletions examples/connector/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
from sdk_pb2 import common_pb2
from sdk_pb2 import connector_sdk_pb2

INFO = "INFO"
WARNING = "WARNING"
SEVERE = "SEVERE"

class ConnectorService(connector_sdk_pb2_grpc.ConnectorServicer):
def ConfigurationForm(self, request, context):
log_message(INFO, "Fetching configuration form")
form_fields = common_pb2.ConfigurationFormResponse(schema_selection_supported=True,
table_selection_supported=True)
form_fields.fields.add(name="apiKey", label="API Key", required=True, text_field=common_pb2.TextField.PlainText)
Expand All @@ -35,8 +39,9 @@ def Test(self, request, context):
configuration = request.configuration
# Name of the test to be run
test_name = request.name
print("Configuration: ", configuration)
print("Test name: ", test_name)

log_message(INFO, "Test Name: " + str(test_name))

return common_pb2.TestResponse(success=True)

def Schema(self, request, context):
Expand Down Expand Up @@ -114,6 +119,8 @@ def Update(self, request, context):
yield connector_sdk_pb2.UpdateResponse(operation=operation)
state["cursor"] += 1

log_message(WARNING, "Completed sending update records")

# -- Send DELETE record
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
Expand All @@ -139,6 +146,12 @@ def Update(self, request, context):
log.message = "Sync Done"
yield connector_sdk_pb2.UpdateResponse(log_entry=log)

log_message(SEVERE, "Sending severe log: Completed Update method")


def log_message(level, message):
print(f'{{"level":"{level}", "message": "{message}", "message-origin": "sdk_connector"}}')


def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
Expand All @@ -152,4 +165,4 @@ def start_server():

if __name__ == '__main__':
print("Starting the server...")
start_server()
start_server()
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
import java.util.Map;

public class DestinationServiceImpl extends DestinationGrpc.DestinationImplBase {

private final String INFO = "INFO";
private final String WARNING = "WARNING";
private final String SEVERE = "SEVERE";

@Override
public void configurationForm(ConfigurationFormRequest request, StreamObserver<ConfigurationFormResponse> responseObserver) {
logMessage(INFO, "Fetching configuration form");
responseObserver.onNext(
ConfigurationFormResponse.newBuilder()
.setSchemaSelectionSupported(true)
Expand Down Expand Up @@ -45,7 +51,8 @@ public void configurationForm(ConfigurationFormRequest request, StreamObserver<C
public void test(TestRequest request, StreamObserver<TestResponse> responseObserver) {
Map<String, String> configuration = request.getConfigurationMap();
String testName = request.getName();
System.out.println("test name: " + testName);
String message = String.format("Test Name: %s", testName);
logMessage(INFO, message);

responseObserver.onNext(TestResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
Expand All @@ -66,15 +73,17 @@ public void describeTable(DescribeTableRequest request, StreamObserver<DescribeT
).build()).build();

responseObserver.onNext(response);
logMessage(SEVERE, "Sample Severe log: Completed describe Table method");
responseObserver.onCompleted();
}

@Override
public void createTable(CreateTableRequest request, StreamObserver<CreateTableResponse> responseObserver) {
Map<String, String> configuration = request.getConfigurationMap();

System.out.println("[CreateTable]: "
+ request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList());
String message = "[CreateTable]: "
+ request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList();
logMessage(INFO, message);
responseObserver.onNext(CreateTableResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}
Expand All @@ -83,8 +92,9 @@ public void createTable(CreateTableRequest request, StreamObserver<CreateTableRe
public void alterTable(AlterTableRequest request, StreamObserver<AlterTableResponse> responseObserver) {
Map<String, String> configuration = request.getConfigurationMap();

System.out.println("[AlterTable]: " +
request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList());
String message = "[AlterTable]: " +
request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList();
logMessage(INFO, message);
responseObserver.onNext(AlterTableResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}
Expand All @@ -99,7 +109,8 @@ public void truncate(TruncateRequest request, StreamObserver<TruncateResponse> r

@Override
public void writeBatch(WriteBatchRequest request, StreamObserver<WriteBatchResponse> responseObserver) {
System.out.println("[WriteBatch]: " + request.getSchemaName() + " | " + request.getTable().getName());
String message = "[WriteBatch]: " + request.getSchemaName() + " | " + request.getTable().getName();
logMessage(WARNING, String.format("Sample severe message: %s", message));
for (String file : request.getReplaceFilesList()) {
System.out.println("Replace files: " + file);
}
Expand All @@ -112,4 +123,8 @@ public void writeBatch(WriteBatchRequest request, StreamObserver<WriteBatchRespo
responseObserver.onNext(WriteBatchResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}

private void logMessage(String level, String message){
System.out.println(String.format("{\"level\":\"%s\", \"message\": \"%s\", \"message-origin\": \"sdk_destination\"}", level, message));
}
}
15 changes: 11 additions & 4 deletions examples/destination/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
from sdk_pb2 import destination_sdk_pb2_grpc


INFO = "INFO"
WARNING = "WARNING"
SEVERE = "SEVERE"

class DestinationImpl(destination_sdk_pb2_grpc.DestinationServicer):
def ConfigurationForm(self, request, context):

log_message(INFO, "Fetching Configuraiton form")
host = common_pb2.FormField(name="host", label="Host", required=True,
text_field=common_pb2.TextField.PlainText)
password = common_pb2.FormField(name="password", label="Password", required=True,
Expand All @@ -35,7 +39,7 @@ def ConfigurationForm(self, request, context):

def Test(self, request, context):
test_name = request.name
print("test name: " + test_name)
log_message(INFO, "test name: " + test_name)
return common_pb2.TestResponse(success=True)

def CreateTable(self, request, context):
Expand All @@ -60,13 +64,13 @@ def WriteBatch(self, request, context):
for delete_file in request.delete_files:
print("replace files: " + str(delete_file))

print("Data loading started for table " + request.table.name)
log_message(WARNING, "Data loading started for table " + request.table.name)
for key, value in request.keys.items():
print("----------------------------------------------------------------------------")
print("Decrypting and printing file :" + str(key))
print("----------------------------------------------------------------------------")
read_csv.decrypt_file(key, value)
print("\nData loading completed for table " + request.table.name + "\n")
log_message(INFO, "\nData loading completed for table " + request.table.name + "\n")

res: destination_sdk_pb2.WriteBatchResponse = destination_sdk_pb2.WriteBatchResponse(success=True)
return res
Expand All @@ -75,8 +79,11 @@ def DescribeTable(self, request, context):
column1 = common_pb2.Column(name="a1", type=common_pb2.DataType.UNSPECIFIED, primary_key=True)
column2 = common_pb2.Column(name="a2", type=common_pb2.DataType.DOUBLE)
table: common_pb2.Table = common_pb2.Table(name=request.table_name, columns=[column1, column2])
log_message(SEVERE, "Sample severe message: Completed fetching table info")
return destination_sdk_pb2.DescribeTableResponse(not_found=False, table=table)

def log_message(level, message):
print(f'{{"level":"{level}", "message": "{message}", "message-origin": "sdk_destination"}}')

if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
Expand Down
Loading