Commit b6db701b authored by Klaus Wölfel's avatar Klaus Wölfel

Allow none-parsing of binary files

parent 059f1e3d
...@@ -7,6 +7,9 @@ import com.github.jrubygradle.JRubyExec ...@@ -7,6 +7,9 @@ import com.github.jrubygradle.JRubyExec
repositories { repositories {
mavenCentral() mavenCentral()
jcenter() jcenter()
maven {
url "https://repo.maven.apache.org/maven2/"
}
} }
configurations { configurations {
provided provided
...@@ -16,6 +19,7 @@ version = "0.2.0" ...@@ -16,6 +19,7 @@ version = "0.2.0"
dependencies { dependencies {
compile "org.embulk:embulk-core:0.6.18" compile "org.embulk:embulk-core:0.6.18"
compile "commons-codec:commons-codec:1.9"
provided "org.embulk:embulk-core:0.6.18" provided "org.embulk:embulk-core:0.6.18"
// compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION"
testCompile "junit:junit:4.+" testCompile "junit:junit:4.+"
......
...@@ -14,23 +14,38 @@ import org.embulk.spi.SchemaConfig; ...@@ -14,23 +14,38 @@ import org.embulk.spi.SchemaConfig;
import org.embulk.spi.Exec; import org.embulk.spi.Exec;
import org.embulk.spi.PageBuilder; import org.embulk.spi.PageBuilder;
import org.embulk.spi.util.LineDecoder; import org.embulk.spi.util.FileInputInputStream;
import org.embulk.spi.ColumnConfig; import org.embulk.spi.ColumnConfig;
import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList; import java.util.ArrayList;
import org.apache.commons.codec.binary.Base64;
import static org.embulk.spi.type.Types.STRING; import static org.embulk.spi.type.Types.STRING;
import org.slf4j.Logger;
public class NoneParserPlugin public class NoneParserPlugin
implements ParserPlugin implements ParserPlugin
{ {
public interface PluginTask public interface PluginTask
extends Task, LineDecoder.DecoderTask //, TimestampParser.Task extends Task //, LineDecoder.DecoderTask //, TimestampParser.Task
{ {
@Config("column_name") @Config("column_name")
@ConfigDefault("\"payload\"") @ConfigDefault("\"payload\"")
public String getColumnName(); public String getColumnName();
} }
private final Logger log;
public NoneParserPlugin()
{
this.log = Exec.getLogger(NoneParserPlugin.class);
}
@Override @Override
public void transaction(ConfigSource config, ParserPlugin.Control control) public void transaction(ConfigSource config, ParserPlugin.Control control)
{ {
...@@ -49,22 +64,30 @@ public class NoneParserPlugin ...@@ -49,22 +64,30 @@ public class NoneParserPlugin
FileInput input, PageOutput output) FileInput input, PageOutput output)
{ {
PluginTask task = taskSource.loadTask(PluginTask.class); PluginTask task = taskSource.loadTask(PluginTask.class);
LineDecoder lineDecoder = new LineDecoder(input,task); List<String> files = task.getFiles();
log.info(""+files);
FileInputInputStream dataIn = new FileInputInputStream(input);
PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output); PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output);
String line = null; String line = null;
Integer offset = 0;
Integer bytes_read = 0;
Integer chunksize = 1024;
final String columnName = task.getColumnName(); final String columnName = task.getColumnName();
while( input.nextFile() ){ while( input.nextFile() ){
while(true){ while( true ) {
line = lineDecoder.poll(); byte[] bytesArray = new byte[chunksize];
bytes_read = dataIn.read(bytesArray, offset, chunksize);
if( line == null ){ if( bytes_read == -1 ) {
break; break;
} }
offset += bytes_read;
byte[] fittedBytesArray = Arrays.copyOfRange(bytesArray, 0, bytes_read);
line = Base64.encodeBase64String(fittedBytesArray);
pageBuilder.setString(0, line); pageBuilder.setString(0, line);
pageBuilder.addRecord(); pageBuilder.addRecord();
} }
} }
pageBuilder.finish(); pageBuilder.finish();
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment