Skip to content

Commit

Permalink
Release RapidMiner Belt 1.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Gisa Meier committed Aug 11, 2021
1 parent 8c5c4c3 commit 73c70b4
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.rapidminer</groupId>
<artifactId>belt</artifactId>
<version>1.0.0</version>
<version>1.0.1</version>
<packaging>jar</packaging>
<name>belt</name>
<organization>
Expand Down
84 changes: 84 additions & 0 deletions src/main/java/com/rapidminer/belt/execution/SequentialContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright (C) 2001-2021 by RapidMiner and the contributors
*
* Complete list of developers available at our web site:
*
* http://rapidminer.com
*
* This program is free software: you can redistribute it and/or modify it under the terms of the
* GNU Affero General Public License as published by the Free Software Foundation, either version 3
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along with this program.
* If not, see http://www.gnu.org/licenses/.
*/
package com.rapidminer.belt.execution;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;


/**
* A simple {@link Context} to use for belt calculations. Executes single-threaded in the current thread. The execution
* of tasks can be stopped by calling {@link #stop()}.
*
* @author Gisa Meier
* @since 1.0.1
*/
public class SequentialContext implements Context {

private volatile boolean isActive = true;

@Override
public boolean isActive() {
return isActive;
}

@Override
public int getParallelism() {
return 1;
}

@Override
public <T> List<T> call(List<Callable<T>> callables) throws ExecutionException {
if (callables == null) {
throw new NullPointerException("callables must not be null");
}

// nothing to do if list is empty
if (callables.isEmpty()) {
return Collections.emptyList();
}

// check for null tasks
for (Callable<T> callable : callables) {
if (callable == null) {
throw new NullPointerException("callables must not contain null");
}
}
List<T> results = new ArrayList<>();
for (Callable<T> entry : callables) {
try {
results.add(entry.call());
} catch (Exception e) {
throw new ExecutionException(e);
}
}
return results;
}

/**
* Stops the execution for this context.
*/
public void stop() {
isActive = false;
}

}
68 changes: 41 additions & 27 deletions src/main/java/com/rapidminer/belt/table/Appender.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.DoubleConsumer;
Expand All @@ -37,7 +35,6 @@
import com.rapidminer.belt.execution.ExecutionUtils;
import com.rapidminer.belt.reader.ObjectReader;
import com.rapidminer.belt.reader.Readers;
import com.rapidminer.belt.util.ColumnMetaData;
import com.rapidminer.belt.util.IntegerFormats;


Expand Down Expand Up @@ -156,13 +153,6 @@ public static Table append(List<Table> tables, DoubleConsumer progressCallback,
//return empty table with height 0
return new Table(0);
}
DoubleConsumer progressConsumer;
if (progressCallback == null) {
progressConsumer = v -> {
};
} else {
progressConsumer = progressCallback;
}

// checks if all tables have the same column names
checkForCompatibility(tables);
Expand All @@ -187,35 +177,59 @@ public static Table append(List<Table> tables, DoubleConsumer progressCallback,
return new Table(finalSize);
}

String[] labels = tables.get(0).labelArray();
String[] labels = firstTable.labelArray();
Column[] newColumns = new Column[labels.length];
Map<String, List<ColumnMetaData>> metaDataMap = new HashMap<>();

List<Column> columns = new ArrayList<>(width);
int index = 0;
for (String label : firstTable.labelArray()) {
columns.clear();
DoubleConsumer intermediateConsumer;
if (progressCallback == null) {
intermediateConsumer = d -> {};
} else {
intermediateConsumer = new DoubleConsumer() {

private double previousValue = 0;

@Override
public void accept(double value) {
// without synchronization some updates in this method will get lost, but progress good enough
previousValue += value;
double sendValue = previousValue;
if (sendValue <= 1) {
// due to double imprecision, values slightly bigger than 1 can happen
progressCallback.accept(sendValue);
}
}
};
}

ExecutionUtils.parallel(0, labels.length, index -> {
String label = labels[index];
List<Column> columns = new ArrayList<>(width);
for (Table table : tables) {
columns.add(table.column(label));
}
final int currentIndex = index;
try {
newColumns[index] =
append(columns, finalSize, v -> progressConsumer.accept((currentIndex + v) / width), context);
append(columns, finalSize, progressCallback == null ? intermediateConsumer :
new DoubleConsumer() {

private double previousValue = 0;

@Override
public void accept(double value) {
intermediateConsumer.accept((value - previousValue) / width);
previousValue = value;
}
}
, context);
} catch (IncompatibleTypesException e) {
e.setColumnName(label);
throw e;
}

List<ColumnMetaData> oldMetaData = firstTable.getMetaData(label);
if (!oldMetaData.isEmpty()) {
metaDataMap.put(label, oldMetaData);
}
index++;
progressConsumer.accept(index / (double) width);
context.requireActive();
}, context);
if (progressCallback != null) {
progressCallback.accept(1);
}
return new Table(newColumns, labels, metaDataMap);
return new Table(newColumns, labels, firstTable.getMetaData());

}

Expand Down
30 changes: 30 additions & 0 deletions src/main/java/com/rapidminer/belt/table/Tables.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,36 @@ public static Map<String, Incompatibility> findIncompatible(Table table, Table s
return incompatibles;
}

/**
* Compacts all nominal dictionaries with gaps.
*
* @param table
* the table with the dictionaries to compact
* @return a new table with all columns with compact dictionaries or the same table if that was already the case
* @since 1.0.1; moved from Belt Adapter
*/
public static Table compactDictionaries(Table table) {
Column[] newColumns = null;
int index = 0;
for (Column column : table.getColumns()) {
if (column.type().id() == Column.TypeId.NOMINAL) {
Dictionary dict = column.getDictionary();
if (dict.size() != dict.maximalIndex()) {
if (newColumns == null) {
newColumns = Arrays.copyOf(table.getColumns(), table.width());
}
newColumns[index] = Columns.compactDictionary(column);
}
}
index++;
}
if (newColumns == null) {
return table;
} else {
return new Table(newColumns, table.labelArray(), table.getMetaData());
}
}

/**
* Checks for the case where the column requirement is subset or equal.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rapidminer/belt/util/SortingInt.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private static void partialDescendingSort(int[] src, int[] indices, int start, i
* <p>The method assumes that the index array contains a valid mapping.
*
* @param src
* the source array (remains unchanged))
* the source array (remains unchanged)
* @param indices
* the index mapping (will be reordered)
* @param buffer
Expand Down
5 changes: 4 additions & 1 deletion src/test/java/com/rapidminer/belt/ContextTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.junit.runners.Parameterized;

import com.rapidminer.belt.execution.Context;
import com.rapidminer.belt.execution.SequentialContext;
import com.rapidminer.belt.util.Belt;


Expand All @@ -59,7 +60,9 @@ public static class ForEveryContext {

@Parameterized.Parameters(name = "{0}")
public static Iterable<Context> workloads() {
return Arrays.asList(Belt.defaultContext(), Context.singleThreaded(Belt.defaultContext()));
return Arrays.asList(Belt.defaultContext(),
Context.singleThreaded(Belt.defaultContext()),
new SequentialContext());
}

@Test
Expand Down
15 changes: 9 additions & 6 deletions src/test/java/com/rapidminer/belt/table/AppenderTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
Expand Down Expand Up @@ -866,26 +867,28 @@ public void testDifferentTables() {
}

@Test
public void testCallbackMonotonicity() {
public void testCallbackNotConstant() {
TableBuilder builder = Builders.newTableBuilder(17);
builder.addTime("time", i -> LocalTime.of(i, i));
builder.addNominal("nominal", i -> "value_" + i);
builder.addMetaData("nominal", ColumnRole.ID);
builder.addDateTime("date-time", i -> Instant.EPOCH);
Table table = builder.build(Belt.defaultContext());
Table[] tables = new Table[99];
Table[] tables = new Table[9];
Arrays.fill(tables, table);
List<Table> tableList = Arrays.asList(tables);
final double[] callbackResult = new double[]{-1.0};
AtomicBoolean found = new AtomicBoolean(false);
Appender.append(tableList, p -> {
if (callbackResult[0] > p) {
throw new IllegalStateException();
} else {
callbackResult[0] = p;
if (p > 0 && p < 1) {
found.set(true);
}
callbackResult[0] = p;
}, Belt.defaultContext());
assertTrue(found.get());
assertEquals(1.0, callbackResult[0], EPSILON);
}

}

public static class TableAppendInput {
Expand Down
59 changes: 59 additions & 0 deletions src/test/java/com/rapidminer/belt/table/TablesTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import java.time.LocalTime;
Expand All @@ -32,6 +34,10 @@
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;

import com.rapidminer.belt.buffer.Buffers;
import com.rapidminer.belt.buffer.NominalBuffer;
import com.rapidminer.belt.column.Column;
import com.rapidminer.belt.column.Columns;
import com.rapidminer.belt.execution.Context;
import com.rapidminer.belt.util.Belt;

Expand Down Expand Up @@ -438,6 +444,59 @@ public void testSimilarToTableReorderKeepMerge() {
}
}

public static class GapRemoval {

@Test
public void testGapRemoval(){
final NominalBuffer nominalBuffer = Buffers.nominalBuffer(100);
nominalBuffer.set(0, "red");
nominalBuffer.set(1, "green");
for (int i = 1; i <= 50; i++) {
nominalBuffer.set(i, "blue");
}
for (int i = 51; i < 100; i++) {
nominalBuffer.set(i, "green");
}
Table table =
Builders.newTableBuilder(100).add("nominal", nominalBuffer.toColumn()).add("nominal2",
nominalBuffer.toColumn()).add("nominal3", nominalBuffer.toColumn()).build(Belt.defaultContext());
table = table.rows(0, 50, Belt.defaultContext());
table =
Builders.newTableBuilder(50).add("nominal",
Columns.removeUnusedDictionaryValues(table.column("nominal"), Columns.CleanupOption.REMOVE,
Belt.defaultContext())).add("nominal2",
table.column("nominal2")).add("nominal3",
Columns.removeUnusedDictionaryValues(table.column("nominal3"), Columns.CleanupOption.COMPACT,
Belt.defaultContext())).build(Belt.defaultContext());
final Table compacted = Tables.compactDictionaries(table);
assertNotSame(table, compacted);
for (Column column : compacted.columnList()) {
assertEquals(column.getDictionary().maximalIndex(), column.getDictionary().size());
}
}

@Test
public void testNoGapRemoval(){
final NominalBuffer nominalBuffer = Buffers.nominalBuffer(100);
nominalBuffer.set(0, "red");
nominalBuffer.set(1, "green");
for (int i = 1; i <= 50; i++) {
nominalBuffer.set(i, "blue");
}
for (int i = 51; i < 100; i++) {
nominalBuffer.set(i, "green");
}
Table table =
Builders.newTableBuilder(100).add("nominal", nominalBuffer.toColumn()).add("nominal2",
nominalBuffer.toColumn()).add("nominal3", nominalBuffer.toColumn()).build(Belt.defaultContext());
table = table.rows(0, 50, Belt.defaultContext());
final Table compacted = Tables.compactDictionaries(table);
assertSame(table, compacted);
for (Column column : compacted.columnList()) {
assertEquals(column.getDictionary().maximalIndex(), column.getDictionary().size());
}
}
}

public static class Incompatible {

Expand Down

0 comments on commit 73c70b4

Please sign in to comment.