Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Nov 15, 2024
1 parent 41bb575 commit 154b13a
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.logging.log4j.util.Strings;


/**
Expand Down Expand Up @@ -835,7 +834,7 @@ public void setNearlineProducerCountPerWriter(int nearlineProducerCountPerWriter
}

public String getRealTimeTopicName() {
return Strings.isBlank(realTimeTopicName) ? Version.composeRealTimeTopic(getName()) : realTimeTopicName;
return Version.getRealTimeTopicNameIfEmpty(realTimeTopicName, getName());
}

public void setRealTimeTopicName(String realTimeTopicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public interface Version extends Comparable<Version>, DataModelBackedStructure<S
*/
String VENICE_RE_PUSH_PUSH_ID_PREFIX = "venice_re_push_";

static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, String storeName) {
if (Strings.isBlank(realTimeTopicName)) {
return composeRealTimeTopic(storeName);
} else {
return realTimeTopicName;
}
}

/**
* Producer type for writing data to Venice
*/
Expand Down Expand Up @@ -299,21 +307,12 @@ static String composeRealTimeTopic(String storeName) {
}

static String getRealTimeTopicName(Store store) {
String realTimeTopicName = store.getRealTimeTopicName();
if (Strings.isBlank(realTimeTopicName)) {
return composeRealTimeTopic(store.getName());
} else {
return realTimeTopicName;
}
return store.getRealTimeTopicName();
}

static String getRealTimeTopicName(StoreInfo storeInfo) {
String realTimeTopicName = storeInfo.getRealTimeTopicName();
if (Strings.isBlank(realTimeTopicName)) {
return composeRealTimeTopic(storeInfo.getName());
} else {
return realTimeTopicName;
}
return getRealTimeTopicNameIfEmpty(realTimeTopicName, storeInfo.getName());
}

static String getRealTimeTopicName(Version version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.logging.log4j.util.Strings;


/**
Expand Down Expand Up @@ -413,11 +412,7 @@ public void setRmdVersionId(int replicationMetadataVersionId) {
@Override
public String getRealTimeTopicName() {
String realTimeTopicName = this.storeVersion.realTimeTopicName.toString();
if (Strings.isBlank(realTimeTopicName)) {
return Version.composeRealTimeTopic(this.getStoreName());
} else {
return realTimeTopicName;
}
return Version.getRealTimeTopicNameIfEmpty(realTimeTopicName, getStoreName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,8 @@ public void setNearlineProducerCountPerWriter(int producerCnt) {

@Override
public String getRealTimeTopicName() {
return this.storeProperties.realTimeTopicName.toString();
String realTimeTopicName = this.storeProperties.realTimeTopicName.toString();
return Version.getRealTimeTopicNameIfEmpty(realTimeTopicName, getName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ public void testUpdateAndReadStore() {
public void testLoadFromZK() {
repo.refresh();
Store s1 = TestUtils.createTestStore("s1", "owner", System.currentTimeMillis());
s1.addVersion(new VersionImpl(s1.getName(), s1.getLargestUsedVersionNumber() + 1, "pushJobId"));
s1.addVersion(
new VersionImpl(s1.getName(), s1.getLargestUsedVersionNumber() + 1, "pushJobId", s1.getRealTimeTopicName()));
s1.setReadQuotaInCU(100);
repo.addStore(s1);
Store s2 = TestUtils.createTestStore("s2", "owner", System.currentTimeMillis());
s2.addVersion(new VersionImpl(s2.getName(), 3, s1.getRealTimeTopicName()));
s2.addVersion(new VersionImpl(s2.getName(), 3, s2.getRealTimeTopicName()));
s2.setReadQuotaInCU(200);
repo.addStore(s2);

Expand Down

0 comments on commit 154b13a

Please sign in to comment.