Commit fe929a1c authored by Klaus Wölfel's avatar Klaus Wölfel

Read filename from first bytes of stream and provide it as tag

parent b6db701b
...@@ -29,7 +29,7 @@ import org.slf4j.Logger; ...@@ -29,7 +29,7 @@ import org.slf4j.Logger;
public class NoneParserPlugin public class NoneParserPlugin
implements ParserPlugin implements ParserPlugin
{ {
static int MAX_NAME_LENGTH = 255;
public interface PluginTask public interface PluginTask
extends Task //, LineDecoder.DecoderTask //, TimestampParser.Task extends Task //, LineDecoder.DecoderTask //, TimestampParser.Task
...@@ -53,7 +53,8 @@ public class NoneParserPlugin ...@@ -53,7 +53,8 @@ public class NoneParserPlugin
ArrayList<ColumnConfig> columns = new ArrayList<ColumnConfig>(); ArrayList<ColumnConfig> columns = new ArrayList<ColumnConfig>();
final String columnName = task.getColumnName(); final String columnName = task.getColumnName();
columns.add(new ColumnConfig(columnName, STRING ,config)); columns.add(new ColumnConfig(columnName, STRING, config));
columns.add(new ColumnConfig("tag", STRING, config));
Schema schema = new SchemaConfig(columns).toSchema(); Schema schema = new SchemaConfig(columns).toSchema();
control.run(task.dump(), schema); control.run(task.dump(), schema);
...@@ -64,28 +65,43 @@ public class NoneParserPlugin ...@@ -64,28 +65,43 @@ public class NoneParserPlugin
FileInput input, PageOutput output) FileInput input, PageOutput output)
{ {
PluginTask task = taskSource.loadTask(PluginTask.class); PluginTask task = taskSource.loadTask(PluginTask.class);
List<String> files = task.getFiles();
log.info(""+files);
FileInputInputStream dataIn = new FileInputInputStream(input); FileInputInputStream dataIn = new FileInputInputStream(input);
PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output); PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output);
String line = null; int chunksize = 1024 * 1024 * 10;
Integer offset = 0;
Integer bytes_read = 0;
Integer chunksize = 1024;
final String columnName = task.getColumnName();
while( input.nextFile() ){ while( input.nextFile() ){
while( true ) { byte[] pathBytesArray = new byte[MAX_NAME_LENGTH];
byte[] bytesArray = new byte[chunksize]; int i = 0;
int c;
for (; i < MAX_NAME_LENGTH; i++) {
c = dataIn.read();
if ( c == -1) {
break;
} else if ( c == 0 ) {
// read empty bytes until MAX_NAME_LENGTH;
for (int j = i + 1; j < MAX_NAME_LENGTH; j++) {
dataIn.read();
}
break;
}
pathBytesArray[i] = (byte)c;
}
String path = new String(Arrays.copyOfRange(pathBytesArray, 0, i));
int bytes_read = 0;
while( bytes_read != -1 ) {
byte bytesArray[] = new byte[chunksize];
int offset = 0;
while ( offset < chunksize ) {
bytes_read = dataIn.read(bytesArray, offset, chunksize); bytes_read = dataIn.read(bytesArray, offset, chunksize);
if( bytes_read == -1 ) { if( bytes_read == -1 ) {
break; break;
} }
offset += bytes_read; offset += bytes_read;
byte[] fittedBytesArray = Arrays.copyOfRange(bytesArray, 0, bytes_read); }
line = Base64.encodeBase64String(fittedBytesArray);
pageBuilder.setString(0, line); log.info(offset + path);
pageBuilder.setString(0, Base64.encodeBase64String(Arrays.copyOfRange(bytesArray, 0, offset)));
pageBuilder.setString(1, path);
pageBuilder.addRecord(); pageBuilder.addRecord();
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment