Commit 31252627 authored by yu's avatar yu

Rewrite the testingEmbulk to TestHelper

parent 05999bcb
package org.embulk.output.joinfile;
import java.util.List;
import com.google.common.base.Optional;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.OutputPlugin;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.Page;
import org.embulk.spi.TransactionalPageOutput;
import org.slf4j.Logger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
public class JoinfileOutputPlugin
implements OutputPlugin
{
public interface PluginTask
extends Task
{
// configuration option 1 (required integer)
@Config("path_prefix")
public String getPathPrefix();
// configuration option 2 (optional string, null is not allowed)
@Config("file_ext")
public String getFileExt();
}
private final Logger log = Exec.getLogger(getClass());
private static FileOutputStream output = null;
@Override
public ConfigDiff transaction(ConfigSource config,
Schema schema, int taskCount,
OutputPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
// retryable (idempotent) output:
// return resume(task.dump(), schema, taskCount, control);
// non-retryable (non-idempotent) output:
log.info("In the transaction " + config);
String path = task.getPathPrefix() + task.getFileExt();
try {
output = new FileOutputStream(new File(path));
} catch (FileNotFoundException ex) {
throw new RuntimeException (ex);
}
control.run(task.dump());
closeFile();
log.info("In the transaction ");
return Exec.newConfigDiff();
}
@Override
public ConfigDiff resume(TaskSource taskSource,
Schema schema, int taskCount,
OutputPlugin.Control control)
{
throw new UnsupportedOperationException("joinfile output plugin does not support resuming");
}
@Override
public void cleanup(TaskSource taskSource,
Schema schema, int taskCount,
List<TaskReport> successTaskReports)
{
}
@Override
public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int taskIndex)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
log.info("In the open " + taskSource.toString()+ " # " + taskIndex);
return new TransactionalPageOutput(){
//private final List<String> filenames = new ArrayList<>() ;
public void add(Page page){
log.info("The ADD: " + page.getStringReferences() + " ## " +page.getValueReferences());
try {
output.write(page.getStringReference(1).getBytes());
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
public void finish(){
log.info("Finished");
}
public void close(){
log.info("closed");
}
public void abort(){
}
public TaskReport commit(){
return Exec.newTaskReport();
}
};
// Write your code here :)
//throw new UnsupportedOperationException("JoinfileOutputPlugin.run method is not implemented yet");
}
public static void closeFile()
{
if (output!= null){
try {
output.close();
}catch (IOException ex ) {
throw new RuntimeException(ex);
}
}
}
}
......@@ -8,6 +8,7 @@ import org.embulk.test.EmbulkTests;
import org.embulk.test.TestingEmbulk;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.ParserPlugin;
import org.embulk.spi.OutputPlugin;
import org.embulk.spi.SchemaConfig;
import org.embulk.spi.ColumnConfig;
import org.junit.Rule;
......@@ -41,44 +42,9 @@ public class TestFilenameFileInputPlugin
public TestingEmbulk embulk = TestingEmbulk.builder()
.registerPlugin(InputPlugin.class,"filename",FilenameFileInputPlugin.class)
.registerPlugin(ParserPlugin.class,"none-bin",NoneBinParserPlugin.class)
.registerPlugin(OutputPlugin.class,"none-bin",JoinfileOutputPlugin.class)
.build();
@Test
public void test() throws Exception{
File rootFile = new File(TestFilenameFileInputPlugin.class.getResource("/test.yml").toURI()).getParentFile();
String rootPath = rootFile.getAbsolutePath();
System.out.println("This is the root of the resources: "+rootPath);
Path out1 = Paths.get(rootPath+"/output.csv");
//We can load the yml file in the resource or just define the config below
//ConfigSource config = loadYamlResource(embulk,"/test.yml");
//config = config.set("path_prefix",rootPath+"/data/test.csv");
ConfigSource config = embulk.newConfig()
.set("type","filename")
.set("path_prefix",rootPath+"/data/test.csv")
.set("parser",embulk.newConfig()
.set("charset","UTF-8")
.set("newline","CRLF")
.set("type","csv")
.set("delimiter",",")
.set("quote","")
.set("columns",newSchemaConfig("filename:string")));
//System.out.println(config);
TestingEmbulk.RunResult result1 = embulk.runInput(config,out1);
try {
List<String> sourceLines = Files.readAllLines(Paths.get(rootPath+"/data/test.csv"));
List<String> targetLines = Files.readAllLines(Paths.get(rootPath+"/output.csv"));
char zero = (char) 0;
assertEquals(targetLines.get(0),rootPath+"/data/test.csv"+zero);
//assertEquals(targetLines.get(0).trim(),rootPath+"/data/test.csv");
assertEquals(targetLines.size(),sourceLines.size());
for(int i = 1; i<sourceLines.size(); i++){
assertEquals(targetLines.get(i),sourceLines.get(i));
}
} catch (IOException ex){ex.printStackTrace();}
}
@Test
public void testModifiedOrder() throws Exception{
// ConfigSource config = embulk.loadYamlResource("testModifiedOrder.yml");
......@@ -111,7 +77,7 @@ public class TestFilenameFileInputPlugin
List<Path> arrayOut = new ArrayList<Path> ();
Files.list(out1.getParent()).forEach(a ->{
try { if(Files.isDirectory(a)) { Files.list(a).forEach(b -> arrayOut.add(b)); }}
catch (IOException ex) {ex.printStackTrace();}
catch (IOException ex) {ex.printStackTrace();}
});
System.out.println("Not sorted yet " + arrayOut);
......
/*
// This TestHelper is writed for unit test using many thrid party plugin;
// For example, to test the filename plugin, I need the parser-none-bin and output-joinfile
// With the embulk test framework, if you want to test the input plugin, it will use the csv parser and file output!
// To use the plugins, just register them when initialize the embulk
@Rule
public TestingEmbulk embulk = TestingEmbulk.builder()
.registerPlugin(InputPlugin.class,"filename",FilenameFileInputPlugin.class)
.registerPlugin(ParserPlugin.class,"none-bin",NoneBinParserPlugin.class)
.build();
// For the configSource, you can read the yml file in the resources
embulk.runAllBuilder("Path to your config.yml");
// Or you can generate the configSource manually
ConfigSource inConfig = embulk.newConfig()
.set("type","filename")
.set("path_prefix",rootPath+"/data/test.csv")
.set("parser",embulk.newConfig()
.set("charset","UTF-8")
.set("newline","CRLF")
.set("type","csv")
.set("delimiter",",")
.set("quote","")
.set("columns",embulk.newSchemaConfig("filename:string")));
ConfigSource execConfig = embulk.newConfig()
.set("max_threads","1");
ConfigSource outConfig = embulk.newConfig()
//two config are required: inConfig and outConfig, and two are optional: execConfig and filtersConfig
embulk.runAllBuilder(inConfig,outConfig);
//Or:
embulk.runAllBuilder(execConfig,inConfig,outConfig);
//Or:
embulk.runAllBuilder(execConfig,inConfig,filtersConfig,outConfig);
//If you want to use the TempDiretory for the output path
Path tmp = embulk.createTempDir();
ConfigSource outConfig = embulk.newConfig()
.set("type","joinfile")
.set("path_prefix",+tmp.toString() + "/sample_")
.set("file_ext","txt");
// After runing the embulk you can extract the file from tmp to assert that the result is ok.
*/
package org.embulk.input.filename;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.common.io.ByteStreams;
import org.embulk.test.EmbulkTests;
import org.embulk.test.PreviewResultInputPlugin;
//import org.embulk.test.TestingBulkLoader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import org.embulk.EmbulkEmbed;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigLoader;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.SchemaConfig;
import org.embulk.config.ColumnConfig;
import org.embulk.spi.TempFileException;
import org.embulk.spi.TempFileSpace;
import org.embulk.spi.Schema;
import org.embulk.spi.SchemaConfig;
import org.embulk.exec.PreviewResult;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.embulk.plugin.InjectedPluginSource.registerPluginTo;
//The import of bulkLoader
import com.google.common.base.Function;
import com.google.common.base.Optional;
//import com.google.common.collect.ImmutableList;
//import com.google.inject.Binder;
import com.google.inject.Inject;
//import com.google.inject.Injector;
//import com.google.inject.Module;
import com.google.inject.util.Modules;
//import java.util.List;
//import org.embulk.config.ConfigSource;
//import org.embulk.config.TaskReport;
import org.embulk.exec.BulkLoader;
import org.embulk.exec.ExecutionResult;
import org.embulk.exec.ForSystemConfig;
import org.embulk.exec.ResumeState;
import org.embulk.spi.Exec;
import org.embulk.spi.ExecSession;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.Schema;
import org.slf4j.Logger;
public class TestHelper implements TestRule
{
public static class Builder{
private List<Module> modules = new ArrayList<>();
Builder()
{}
public <T> Builder registerPlugin(final Class<T> iface, final String name, final Class<?> impl)
{
modules.add(new Module() {
public void configure(Binder binder)
{
registerPluginTo(binder, iface, name, impl);
}
});
return this;
}
public TestHelper build()
{
return new TestHelper(this);
}
}
public static Builder builder()
{
return new Builder();
}
private final List<Module> modules;
private EmbulkEmbed embed;
private TempFileSpace tempFiles;
TestHelper(Builder builder)
{
this.modules = ImmutableList.copyOf(builder.modules);
reset();
}
public void reset()
{
destroy();
this.embed = new EmbulkEmbed.Bootstrap()
.addModules(modules)
.overrideModules(TestingBulkLoader.override())
.initializeCloseable();
try {
this.tempFiles = new TempFileSpace(Files.createTempDirectory("embulk-test-temp-").toFile());
} catch (IOException ex) {
throw new TempFileException(ex);
}
}
public void destroy()
{
if (embed != null){
embed.destroy();
embed = null;
}
if (tempFiles != null){
tempFiles.cleanup();
tempFiles = null;
}
}
@Override
public Statement apply(Statement base, Description description)
{
return new EmbulkTestingEmbedWatcher().apply(base, description);
}
private class EmbulkTestingEmbedWatcher extends TestWatcher
{
@Override
protected void starting(Description description)
{
reset();
}
@Override
protected void finished(Description description)
{
destroy();
}
}
//This is very strange you need to create a file to create a temp directory
public Path createTempFile(String suffix)
{
return tempFiles.createTempFile(suffix).toPath();
}
public Path createTempDir()
{
Path tp = tempFiles.createTempFile(null);
tp.toFile().deleteOnExit();
return tp;
}
// Useless
public Injector injector()
{
return embed.getInjector();
}
public ConfigLoader configLoader()
{
return embed.newConfigLoader();
}
public ConfigSource newConfig()
{
return configLoader().newConfigSource();
}
public SchemaConfig newSchemaConfig(String...configs){
ImmutableList.Builder<ColumnConfig> schema = ImmutableList.builder();
for (String column: configs){
ColumnConfig columnConfig = newColumnConfig(column);
if (columnConfig != null){
schema.add(columnConfig);
}
}
return new SchemaConfig(schema.build());
}
public ColumnConfig newColumnConfig(String column){
String[] tuple = column.split(":",2);
return new ColumnConfig(embulk.newConfig()
.set("name",tuple[0])
.set("type",tuple[1]));
}
//Need to import the EMbulkTests
public ConfigSource loadYamlResource(String name)
{
return configLoader().fromYamlString(EmbulkTests.readResource(name));
}
private static final List<String> SUPPORTED_TYPE = ImmutableList.of("boolean","long","double","string","timestamp","json");
public static interface RunResult
{
ConfigDiff getConfigDiff();
List<Throwable> getIgnoredExceptions();
Schema getInputSchema();
Schema getOutputSchema();
List<TaskReport> getInputTaskReports();
List<TaskReport> getOutputTaskReports();
}
//Do not use the InputBuilder, ParserBuilder, OutputBuilder, I sum them together
public class AllBuilder
{
private ConfigSource inConfig = null;
private List<ConfigSource> filtersConfig = ImmutableList.of();
private ConfigSource execConfig = null;
private ConfigSource outConfig = null;
private ConfigSource config = null;
//private Path outputPath = null;
private AllBuilder()
{}
// In the inConfig, the parser config should be set.
public AllBuilder in(ConfigSource inConfig)
{
checkNotNull(inConfig,"inConfig");
this.inConfig = inConfig.deepCopy();
return this;
}
public AllBuilder filters (List<ConfigSource> filtersConfig)
{
checkNotNull(filtersConfig,"filtersConfig");
ImmutableList.Builder<ConfigSource> builder = ImmutableList.builder();
for (ConfigSource filter : filtersConfig){
builder.add(filter.deepCopy());
}
this.filtersConfig = builder.build();
return this;
}
public AllBuilder exec (ConfigSource execConfig)
{
checkNotNull(execConfig,"execConfig");
this.execConfig = execConfig.deepCopy();
return this;
}
public AllBuilder out(ConfigSource outConfig)
{
checkNotNull(outConfig,"outConfig");
this.outConfig = outConfig.deepCopy();
return this;
}
//public ConfigDiff guess(){}
//public PreviewResult preview() throws IOException{}
public RunResult run() throws IOException
{
checkState(inConfig != null, "in config must be set");
checkState(outConfig != null, "out config must be set");
ConfigSource config = newConfig()
.set("exec",execConfig)
.set("in",inConfig)
.set("filters",filtersConfig)
.set("out",outConfig);
return (RunResult) embed.run(config);
}
public RunResult run() throws IOException
{
checkState(inConfig != null, "in config must be set");
checkState(outConfig != null, "out config must be set");
ConfigSource config = newConfig()
.set("exec",execConfig)
.set("in",inConfig)
.set("filters",filtersConfig)
.set("out",outConfig);
return (RunResult) embed.run(config);
}
public RunResult runFromYml(String name) throws IOException
{
ConfigSource config = loadYamlResource(name);
return (RunResult) embed.run(config);
}
}
private RunResult buildRunResultWithOutput(RunResult result, Path outputDir, Path outputPath) throws IOException
{
copyToPath(outputDir, outputPath);
return result;
}
private void copyToPath(Path outputDir, Path outputPath) throws IOException
{
try (OutputStream out = Files.newOutputStream(outputPath)){
List<Path> fragments = new ArrayList<Path> ();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(outputDir, "fragments_*.csv")){
for (Path fragment : stream){
fragments.add(fragment);
}
}
Collections.sort(fragments);
for (Path fragment : fragments) {
try (InputStream in = Files.newInputStream(fragment)){
ByteStreams.copy(in,out);
}
}
}
}
public AllBuilder allBuilder()
{
return new AllBuilder();
}
public RunResult runAllBuilder(String name) throws IOException
{
return allBuilder()
.runFromYml(String name);
}
public RunResult runAllBuilder(ConfigSource inConfig, ConfigSource outConfig) throws IOException
{
return allBuilder()
.in(inConfig)
.out(outConfig)
.run();
}
public RunResult runAllBuilder(ConfigSource execConfig,ConfigSource inConfig, ConfigSource outConfig) throws IOException
{
return allBuilder()
.exec(execConfig)
.in(inConfig)
.out(outConfig)
.run();
}
public RunResult runAllBuilder(ConfigSource execConfig,ConfigSource inConfig, ConfigSource filtersConfig, ConfigSource outConfig) throws IOException
{
return allBuilder()
.exec(execConfig)
.in(inConfig)
.filters(filtersConfig)
.out(outConfig)
.run();
}
}
//the testingbulkloader is under here
class TestingBulkLoader
extends BulkLoader
{
static Function<List<Module>, List<Module>> override()
{
return new Function<List<Module>, List<Module>>() {
@Override
public List<Module> apply(List<Module> modules)
{
Module override = new Module() {
public void configure(Binder binder)
{
binder.bind(BulkLoader.class).to(TestingBulkLoader.class);
registerPluginTo(binder, InputPlugin.class, "preview_result", PreviewResultInputPlugin.class);
}
};
return ImmutableList.of(Modules.override(modules).with(ImmutableList.of(override)));
}
};
}
@Inject
public TestingBulkLoader(Injector injector,
@ForSystemConfig ConfigSource systemConfig)
{
super(injector, systemConfig);
}
@Override
protected LoaderState newLoaderState(Logger logger, ProcessPluginSet plugins)
{
return new TestingLoaderState(logger, plugins);
}
protected static class TestingLoaderState
extends LoaderState
{
public TestingLoaderState(Logger logger, ProcessPluginSet plugins)
{
super(logger, plugins);
}
@Override
public ExecutionResult buildExecuteResultWithWarningException(Throwable ex)
{
ExecutionResult result = super.buildExecuteResultWithWarningException(ex);
return new TestingExecutionResult(result, buildResumeState(Exec.session()), Exec.session());
}
}
static class TestingExecutionResult
extends ExecutionResult
implements TestHelper.RunResult
{
private final Schema inputSchema;
private final Schema outputSchema;
private final List<TaskReport> inputTaskReports;
private final List<TaskReport> outputTaskReports;
public TestingExecutionResult(ExecutionResult orig,
ResumeState resumeState, ExecSession session)
{
super(orig.getConfigDiff(), orig.isSkipped(), orig.getIgnoredExceptions());
this.inputSchema = resumeState.getInputSchema();
this.outputSchema = resumeState.getOutputSchema();
this.inputTaskReports = buildReports(resumeState.getInputTaskReports(), session);
this.outputTaskReports = buildReports(resumeState.getOutputTaskReports(), session);
}
private static List<TaskReport> buildReports(List<Optional<TaskReport>> optionalReports, ExecSession session)
{
ImmutableList.Builder<TaskReport> reports = ImmutableList.builder();
for (Optional<TaskReport> report : optionalReports) {
reports.add(report.or(session.newTaskReport()));
}
return reports.build();
}
@Override
public Schema getInputSchema()
{
return inputSchema;
}
@Override
public Schema getOutputSchema()
{
return outputSchema;
}
@Override
public List<TaskReport> getInputTaskReports()
{
return inputTaskReports;
}
@Override
public List<TaskReport> getOutputTaskReports()
{
return outputTaskReports;
}
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment