Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Logging): Add examples for providing logs #74

Merged
merged 12 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/connector/golang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ Run all commands from the golang folder root
> scripts/copy_protos.sh
> scripts/compile_protos.sh
> scripts/build.sh
> main
> ./main
```
15 changes: 8 additions & 7 deletions examples/connector/golang/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ module fivetran.com/fivetran_sdk
go 1.21

require (
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.35.1
)

require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
github.com/golang/protobuf v1.5.4 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 // indirect
)
17 changes: 17 additions & 0 deletions examples/connector/golang/go.sum
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 h1:zciRKQ4kBpFgpfC5QQCVtnnNAcLIqweL7plyZRQHVpI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 h1:F29+wU6Ee6qgu9TddPgooOdaqsxTMunOoj8KA5yuS5A=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1/go.mod h1:5KF+wpkbTSbGcR9zteSqZV6fqFOWBl4Yde8En8MryZA=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
24 changes: 23 additions & 1 deletion examples/connector/golang/golang_connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"google.golang.org/grpc"
)

const INFO = "INFO"
const WARNING = "WARNING"
const SEVERE = "SEVERE"

var port = flag.Int("port", 50051, "The server port")

type MyState struct {
Expand All @@ -30,7 +34,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
state := MyState{}
json.Unmarshal([]byte(state_json), &state)

log.Println("config: ", config, "selection: ", selection, "state_json: ", state_json, "mystate: ", state)
message := fmt.Sprintf("config: %s, selection: %s, state_json: %s, mystate: %s", config, selection, state_json, state)
fivetran-satvikpatil marked this conversation as resolved.
Show resolved Hide resolved
LogMessage(INFO, message)

// -- Send a log message
stream.Send(&pb.UpdateResponse{
Expand Down Expand Up @@ -66,6 +71,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
state.Cursor++
}

LogMessage(INFO, "Completed sending upsert records")

// -- Send UPDATE record
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Expand All @@ -86,6 +93,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
})
state.Cursor++

LogMessage(INFO, "Completed sending update records")

// -- Send DELETE record
stream.Send(&pb.UpdateResponse{
Response: &pb.UpdateResponse_Operation{
Expand All @@ -105,6 +114,8 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
})
state.Cursor++

LogMessage(WARNING, "Sample warning message: Completed sending delete records")

// Serialize state from struct to JSON string
new_state_json, _ := json.Marshal(state)
new_state := string(new_state_json)
Expand Down Expand Up @@ -133,6 +144,7 @@ func (s *server) Update(in *pb.UpdateRequest, stream pb.Connector_UpdateServer)
},
})

LogMessage(SEVERE, "Sample severe message: Update call completed")
// End the RPC call
return nil
}
Expand Down Expand Up @@ -258,6 +270,16 @@ func (s *server) Test(ctx context.Context, in *pb.TestRequest) (*pb.TestResponse
}, nil
}

func LogMessage(level string, message string) {
fivetran-rishabhghosh marked this conversation as resolved.
Show resolved Hide resolved
log := map[string]interface{}{
"level": level,
"message": message,
"message-origin": "sdk_connector",
}
logJSON, _ := json.Marshal(log)
fmt.Println(string(logJSON))
}

func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
Expand Down
2 changes: 1 addition & 1 deletion examples/connector/golang/scripts/compile_protos.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
protoc \
PATH="${PATH}:${HOME}/go/bin" protoc \
--proto_path=proto \
--go_out=proto \
--go_opt=paths=source_relative \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
import java.util.*;

public class ConnectorServiceImpl extends ConnectorGrpc.ConnectorImplBase {
private final String INFO = "INFO";
private final String WARNING = "WARNING";
private final String SEVERE = "SEVERE";
@Override
public void configurationForm(ConfigurationFormRequest request, StreamObserver<ConfigurationFormResponse> responseObserver) {
logMessage(INFO, "Started fetching configuration form");
responseObserver.onNext(
ConfigurationFormResponse.newBuilder()
.setSchemaSelectionSupported(true)
Expand Down Expand Up @@ -41,23 +45,28 @@ public void configurationForm(ConfigurationFormRequest request, StreamObserver<C
ConfigurationTest.newBuilder().setName("select").setLabel("Tests selection").build()))
.build());

logMessage(INFO, "Fetching configuration form completed");
responseObserver.onCompleted();
}

@Override
public void test(TestRequest request, StreamObserver<TestResponse> responseObserver) {

Map<String, String> configuration = request.getConfigurationMap();

// Name of the test to be run
String testName = request.getName();
System.out.println("test name: " + testName);
String message = String.format("test name: %s", testName);
logMessage(INFO, message);

responseObserver.onNext(TestResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}

@Override
public void schema(SchemaRequest request, StreamObserver<SchemaResponse> responseObserver) {

logMessage(WARNING, "Sample warning message while fetching schema");
Map<String, String> configuration = request.getConfigurationMap();

TableList tableList = TableList.newBuilder()
Expand Down Expand Up @@ -177,10 +186,16 @@ public void update(UpdateRequest request, StreamObserver<UpdateResponse> respons
.build())
.build());
} catch (JsonProcessingException e) {
String message = e.getMessage();
logMessage(SEVERE, message);
responseObserver.onError(e);
}

// End the streaming RPC call
responseObserver.onCompleted();
}

private void logMessage(String level, String message) {
System.out.println(String.format("{\"level\":\"%s\", \"message\": \"%s\", \"message-origin\": \"sdk_connector\"}", level, message));
}
}
25 changes: 13 additions & 12 deletions examples/connector/nodejs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ const options = {
};
var packageDefinitionConnector = protoLoader.loadSync(PROTO_PATH_CONNECTOR, options);

const INFO = "INFO";
const WARNING = "WARNING";
const SEVERE = "SEVERE";

const protoDescriptor = grpc.loadPackageDefinition(packageDefinitionConnector);

const connectorSdkProto = protoDescriptor.fivetran_sdk;

const server = new grpc.Server();

const configurationForm = (call, callback) => {
logMessage(INFO, "Fetching configuration form")
callback(null, {
schema_selection_supported: true,
table_selection_supported: true,
Expand All @@ -39,12 +44,13 @@ const configurationForm = (call, callback) => {
const test = (call, callback) => {
const configuration = call.request.configuration;
const testName = call.request.name;
console.log(`Test name: ${testName}`);
logMessage(INFO, `Test name: ${testName}`)
callback(null, { success: true });
};

// Implement the Schema RPC method
const schema = (call, callback) => {
logMessage(INFO, "Fetching the schema from the implemented method")
const tableList = {
tables: [
{
Expand Down Expand Up @@ -77,15 +83,6 @@ const configurationForm = (call, callback) => {
call.write(response);
};

const sendLogEntry = (message) => {
sendResponse({
log_entry: {
level: "INFO",
message: message
}
});
};

const sendOperation = (operation) => {
sendResponse({
operation: operation
Expand All @@ -94,7 +91,7 @@ const configurationForm = (call, callback) => {

try {
// Send a log message
sendLogEntry("Sync STARTING");
logMessage(WARNING, "Sample Warning message: Sync STARTING");

// Send UPSERT records
for (let i = 0; i < 3; i++) {
Expand Down Expand Up @@ -145,7 +142,7 @@ const configurationForm = (call, callback) => {
});

// Send a log message
sendLogEntry("Sync DONE");
logMessage(SEVERE, "Sample severe message: Sync done")

} catch (error) {
callback(error);
Expand All @@ -155,6 +152,10 @@ const configurationForm = (call, callback) => {
call.end();
};

function logMessage(level, message) {
console.log(`{"level":"${level}", "message": "${message}", "message-origin": "sdk_connector"}`);
}


server.addService(connectorSdkProto.Connector.service, {configurationForm, test, schema, update})

Expand Down
19 changes: 16 additions & 3 deletions examples/connector/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
from sdk_pb2 import common_pb2
from sdk_pb2 import connector_sdk_pb2

INFO = "INFO"
WARNING = "WARNING"
SEVERE = "SEVERE"

class ConnectorService(connector_sdk_pb2_grpc.ConnectorServicer):
def ConfigurationForm(self, request, context):
log_message(INFO, "Fetching configuration form")
form_fields = common_pb2.ConfigurationFormResponse(schema_selection_supported=True,
table_selection_supported=True)
form_fields.fields.add(name="apiKey", label="API Key", required=True, text_field=common_pb2.TextField.PlainText)
Expand All @@ -35,8 +39,9 @@ def Test(self, request, context):
configuration = request.configuration
# Name of the test to be run
test_name = request.name
print("Configuration: ", configuration)
print("Test name: ", test_name)

log_message(INFO, "Test Name: " + str(test_name))

return common_pb2.TestResponse(success=True)

def Schema(self, request, context):
Expand Down Expand Up @@ -114,6 +119,8 @@ def Update(self, request, context):
yield connector_sdk_pb2.UpdateResponse(operation=operation)
state["cursor"] += 1

log_message(WARNING, "Completed sending update records")

# -- Send DELETE record
operation = connector_sdk_pb2.Operation()
val1 = common_pb2.ValueType()
Expand All @@ -139,6 +146,12 @@ def Update(self, request, context):
log.message = "Sync Done"
yield connector_sdk_pb2.UpdateResponse(log_entry=log)

log_message(SEVERE, "Sending severe log: Completed Update method")


def log_message(level, message):
print(f'{{"level":"{level}", "message": "{message}", "message-origin": "sdk_connector"}}')


def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
Expand All @@ -152,4 +165,4 @@ def start_server():

if __name__ == '__main__':
print("Starting the server...")
start_server()
start_server()
Loading
Loading