Skip to content

Commit

Permalink
Merge pull request #4839 from dilanSachi/update-kafka-bbes
Browse files Browse the repository at this point in the history
[master] Update kafka BBEs
  • Loading branch information
dilanSachi authored Aug 25, 2023
2 parents a9ab2ef + 63848e0 commit a414c9f
Show file tree
Hide file tree
Showing 13 changed files with 31 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import ballerina/constraint;
import ballerinax/kafka;
import ballerina/io;

public type Order record {
type Order record {
int orderId;
// Add a constraint to only allow string values of length between 30 and 1.
@constraint:String {maxLength: 30, minLength: 1}
Expand All @@ -19,7 +19,7 @@ public function main() returns error? {

while true {
Order[] orders = check orderConsumer->pollPayload(15);
check from Order 'order in orders
from Order 'order in orders
where 'order.isValid
do {
io:println(string `Received valid order for ${'order.productName}`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import ballerinax/kafka;
import ballerina/io;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
boolean isValid;
};

// Create a subtype of `kafka:AnydataConsumerRecord`.
public type OrderConsumerRecord record {|
type OrderConsumerRecord record {|
*kafka:AnydataConsumerRecord;
Order value;
|};
Expand All @@ -23,7 +23,7 @@ public function main() returns error? {
while true {
// Polls the consumer for order records.
OrderConsumerRecord[] records = check orderConsumer->poll(15);
check from OrderConsumerRecord orderRecord in records
from OrderConsumerRecord orderRecord in records
where orderRecord.value.isValid
do {
io:println(string `Received valid order for ${orderRecord.value.productName}`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ballerinax/kafka;
import ballerina/io;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
Expand All @@ -17,7 +17,7 @@ public function main() returns error? {
while true {
// Polls the consumer for payload.
Order[] orders = check orderConsumer->pollPayload(15);
check from Order 'order in orders
from Order 'order in orders
where 'order.isValid
do {
io:println(string `Received valid order for ${'order.productName}`);
Expand Down
4 changes: 2 additions & 2 deletions examples/kafka-consumer-sasl/kafka_consumer_sasl.bal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ballerinax/kafka;
import ballerina/io;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
Expand All @@ -27,7 +27,7 @@ public function main() returns error? {
// Polls the consumer for payload.
Order[] orders = check orderConsumer->pollPayload(1);

check from Order 'order in orders
from Order 'order in orders
where 'order.isValid
do {
io:println(string `Received valid order for ${'order.productName}`);
Expand Down
4 changes: 2 additions & 2 deletions examples/kafka-consumer-ssl/kafka_consumer_ssl.bal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ballerinax/kafka;
import ballerina/io;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
Expand All @@ -27,7 +27,7 @@ public function main() returns error? {
// Polls the consumer for payload.
Order[] orders = check orderConsumer->pollPayload(1);

check from Order 'order in orders
from Order 'order in orders
where 'order.isValid
do {
io:println(string `Received valid order for ${'order.productName}`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ballerinax/kafka;
import ballerina/http;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
Expand All @@ -15,7 +15,7 @@ service / on new http:Listener(9090) {
self.orderProducer = check new (kafka:DEFAULT_URL);
}

resource function post orders(@http:Payload anydata newOrder) returns http:Accepted|error {
resource function post orders(Order newOrder) returns http:Accepted|error {
check self.orderProducer->send({
topic: "order-topic",
value: newOrder
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka-producer-sasl/kafka_producer_sasl.bal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ballerina/http;
import ballerinax/kafka;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka-producer-ssl/kafka_producer_ssl.bal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ballerina/http;
import ballerinax/kafka;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import ballerina/constraint;
import ballerinax/kafka;
import ballerina/log;

public type Order record {
type Order record {
int orderId;
// Add a constraint to only allow string values of length between 30 and 1.
@constraint:String {maxLength: 30, minLength: 1}
Expand All @@ -18,8 +18,8 @@ listener kafka:Listener orderListener = new (kafka:DEFAULT_URL, {

service on orderListener {

remote function onConsumerRecord(Order[] orders) returns error? {
check from Order 'order in orders
remote function onConsumerRecord(Order[] orders) {
from Order 'order in orders
where 'order.isValid
do {
log:printInfo(string `Received valid order for ${'order.productName}`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ballerinax/kafka;
import ballerina/log;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
Expand All @@ -15,9 +15,9 @@ listener kafka:Listener orderListener = new (kafka:DEFAULT_URL, {

service on orderListener {

remote function onConsumerRecord(Order[] orders) returns error? {
remote function onConsumerRecord(Order[] orders) {
// The set of orders received by the service are processed one by one.
check from Order 'order in orders
from Order 'order in orders
where 'order.isValid
do {
log:printInfo(string `Received valid order for ${'order.productName}`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ballerinax/kafka;
import ballerina/log;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
Expand All @@ -17,9 +17,9 @@ listener kafka:Listener orderListener = new (kafka:DEFAULT_URL, {

service on orderListener {

remote function onConsumerRecord(Order[] orders) returns error? {
remote function onConsumerRecord(Order[] orders) {
// The set of orders received by the service are processed one by one.
check from Order 'order in orders
from Order 'order in orders
where 'order.isValid
do {
log:printInfo(string `Received valid order for ${'order.productName}`);
Expand All @@ -42,4 +42,5 @@ service on orderListener {
} else {
log:printError("An error occured", 'error);
}
}
}
6 changes: 3 additions & 3 deletions examples/kafka-service-sasl/kafka_service_sasl.bal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ballerinax/kafka;
import ballerina/log;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
Expand All @@ -26,8 +26,8 @@ listener kafka:Listener orderListener = new ("localhost:9093", {

service on orderListener {

remote function onConsumerRecord(Order[] orders) returns error? {
check from Order 'order in orders
remote function onConsumerRecord(Order[] orders) {
from Order 'order in orders
where 'order.isValid
do {
log:printInfo(string `Received valid order for ${'order.productName}`);
Expand Down
6 changes: 3 additions & 3 deletions examples/kafka-service-ssl/kafka_service_ssl.bal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ballerinax/kafka;
import ballerina/log;

public type Order readonly & record {
type Order readonly & record {
int orderId;
string productName;
decimal price;
Expand All @@ -25,8 +25,8 @@ listener kafka:Listener orderListener = new ("localhost:9094", {

service on orderListener {

remote function onConsumerRecord(Order[] orders) returns error? {
check from Order 'order in orders
remote function onConsumerRecord(Order[] orders) {
from Order 'order in orders
where 'order.isValid
do {
log:printInfo(string `Received valid order for ${'order.productName}`);
Expand Down

0 comments on commit a414c9f

Please sign in to comment.