Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First draft for a buffered writing of files #183

Merged
merged 17 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gama.core/src/gama/core/common/interfaces/IKeyword.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ public interface IKeyword {

/** The browse. */
String BROWSE = "browse";

String BUFFERING = "buffering";

/** The camera. */
String CAMERA = "camera";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ protected void postStep(final IScope scope) {
executer.executeOneShotActions();
if (outputs != null) { outputs.step(this.getScope()); }
ownClock.step();
GAMA.flushWriteStep(this);
}

@Override
Expand Down Expand Up @@ -437,8 +438,8 @@ public Object _init_(final IScope scope) {
public void dispose() {
if (dead) return;
executer.executeDisposeActions();
// hqnghi if simulation come from popultion extern, dispose pop first
// and then their outputs
// hqnghi if simulation comes from an external population, dispose this population first
// and then its outputs

if (externMicroPopulations != null) { externMicroPopulations.clear(); }

Expand All @@ -455,6 +456,9 @@ public void dispose() {
}
}
if (externalInitsAndParameters != null) { externalInitsAndParameters.clear(); }

//we make sure that all pending write operations are flushed
GAMA.flushWriteSimulation(this);
GAMA.releaseScope(getScope());
// scope = null;
super.dispose();
Expand Down
34 changes: 34 additions & 0 deletions gama.core/src/gama/core/runtime/GAMA.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
********************************************************************************************************/
package gama.core.runtime;

import java.io.File;
lesquoyb marked this conversation as resolved.
Show resolved Hide resolved
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -35,6 +36,8 @@
import gama.core.runtime.IExperimentStateListener.State;
import gama.core.runtime.benchmark.Benchmark;
import gama.core.runtime.benchmark.StopWatch;
import gama.core.runtime.concurrent.WriteController;
import gama.core.runtime.concurrent.WriteController.BufferingStrategies;
import gama.core.runtime.exceptions.GamaRuntimeException;
import gama.core.runtime.exceptions.GamaRuntimeException.GamaRuntimeFileException;
import gama.dev.DEBUG;
Expand Down Expand Up @@ -98,6 +101,37 @@ public class GAMA {
// hqnghi: add several controllers to have multi-thread experiments
private static final List<IExperimentController> controllers = new CopyOnWriteArrayList<>();

private static final WriteController writeController = new WriteController();

public static boolean askWriteFileSimulation(SimulationAgent owner, File f, String content) {
return writeController.askWrite(f.getAbsolutePath(), owner, content, BufferingStrategies.PER_SIMULATION_BUFFERING);
}
public static boolean askWriteFileSimulation(SimulationAgent owner, File f, CharSequence content) {
return writeController.askWrite(f.getAbsolutePath(), owner, content, BufferingStrategies.PER_SIMULATION_BUFFERING);
}

public static boolean askWriteFileCycle(SimulationAgent owner, File f, String content) {
return writeController.askWrite(f.getAbsolutePath(), owner, content, BufferingStrategies.PER_CYCLE_BUFFERING);
}
public static boolean askWriteFileCycle(SimulationAgent owner, File f, CharSequence content) {
return writeController.askWrite(f.getAbsolutePath(), owner, content, BufferingStrategies.PER_CYCLE_BUFFERING);
}

public static boolean askWriteFileDirect(SimulationAgent owner, File f, String content) {
return writeController.askWrite(f.getAbsolutePath(), owner, content, BufferingStrategies.NO_BUFFERING);
}
public static boolean askWriteFileDirect(SimulationAgent owner, File f, CharSequence content) {
return writeController.askWrite(f.getAbsolutePath(), owner, content, BufferingStrategies.NO_BUFFERING);
}

public static boolean flushWriteSimulation(SimulationAgent owner) {
return writeController.flushSimulationOwner(owner);
}
public static boolean flushWriteStep(SimulationAgent owner) {
return writeController.flushCycleOwner(owner);
}
//TODO: other flushes + their operators

/**
* Gets the controllers.
*
Expand Down
145 changes: 145 additions & 0 deletions gama.core/src/gama/core/runtime/concurrent/WriteController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package gama.core.runtime.concurrent;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang3.NotImplementedException;

import gama.core.kernel.simulation.SimulationAgent;
import gama.core.runtime.GAMA;
import gama.core.runtime.exceptions.GamaRuntimeException;

public class WriteController {

public enum BufferingStrategies{
NO_BUFFERING,
PER_CYCLE_BUFFERING,
PER_SIMULATION_BUFFERING
}


protected Map<String, Map<SimulationAgent, StringBuilder>> fileWritingPerSimulationMap;
protected Map<String, Map<SimulationAgent, StringBuilder>> fileWritingPerCycleMap;

public WriteController() {
fileWritingPerSimulationMap = new HashMap<>();
fileWritingPerCycleMap = new HashMap<>();
}

public boolean askWrite(String fileId, SimulationAgent owner, CharSequence content, BufferingStrategies bs) {
switch (bs) {
case PER_SIMULATION_BUFFERING:
return appendWriteSimulation(fileId, owner, content);
case PER_CYCLE_BUFFERING:
return appendWriteCycle(fileId, owner, content);
case NO_BUFFERING:
return directWrite(fileId, content);
default:
throw GamaRuntimeException.create(new NotImplementedException("This buffering strategie has not been implemented yet: " + bs.toString()), owner.getScope());
}
}

protected boolean appendWriteRequestToMap(String fileId, SimulationAgent owner, CharSequence content, Map<String, Map<SimulationAgent, StringBuilder>> map) {
// If we don't have any map for this file yet we create one
Map<SimulationAgent, StringBuilder> fileSavingAsksMap = map.get(fileId);
if (fileSavingAsksMap == null) {
fileSavingAsksMap = new HashMap<>();
map.put(fileId, fileSavingAsksMap);
}

// We loop up for the previous write request of the owner simulation in the map
// if there's already one we append our content, else we create one with the content as its initial value
StringBuilder askRequest = fileSavingAsksMap.get(owner);
if (askRequest == null) {
try {
fileSavingAsksMap.put(owner, new StringBuilder(content));
return true;
}
catch(Exception ex) {
GAMA.reportError(owner.getScope(), GamaRuntimeException.create(ex, owner.getScope()), false);
return false;
}
}
else {
askRequest.append(content);
return true;
}
}

protected boolean appendWriteSimulation(String fileId, SimulationAgent owner, CharSequence content) {
return appendWriteRequestToMap(fileId, owner, content, fileWritingPerSimulationMap);
}

protected boolean appendWriteCycle(String fileId, SimulationAgent owner, CharSequence content) {
return appendWriteRequestToMap(fileId, owner, content, fileWritingPerCycleMap);
}
protected boolean directWrite(String fileId, CharSequence content ) {
try {
var fr = new FileWriter(new File(fileId));
fr.append(content);
fr.flush();
return true;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}

//
// public boolean flushFile(String fileId) {
// FileWriter fr;
// try {
// fr = new FileWriter(new File(fileId));
// // we merge everything
// for(var asks : fileWritingPerSimulationMap.get(fileId)) {
// fr.append(asks.cumulatedContent);
// }
// fr.flush();
// return true;
// } catch (IOException e) {
// e.printStackTrace();
// return false;
// }
//
// }
/**
* Flushes all the save requests made by a simulation saved in a map
* @param owner: the simulation in which the save statements have been executed
* @return true if everything went well, false in case of error
*/
protected boolean flushMapOwner(SimulationAgent owner, Map<String, Map<SimulationAgent, StringBuilder>> map) {
for(var entry : map.entrySet()) {
var cumulatedContent = entry.getValue().get(owner);
if (cumulatedContent != null) {
var success = directWrite(entry.getKey(), cumulatedContent);
if (!success) {
return false;
}
}
}
return true;
}
/**
* Flushes all the save requests made by a simulation with the 'per_simulation_buffering' strategy
* @param owner: the simulation in which the save statements have been executed
* @return true if everything went well, false in case of error
*/
public boolean flushSimulationOwner(SimulationAgent owner) {
return flushMapOwner(owner, fileWritingPerSimulationMap);
}

/**
* Flushes all the save requests made by a simulation with the 'per_cycle_buffering' strategy
* @param owner: the simulation in which the save statements have been executed
* @return true if everything went well, false in case of error
*/
public boolean flushCycleOwner(SimulationAgent owner) {
return flushMapOwner(owner, fileWritingPerCycleMap);
}

}
Loading
Loading