Commit cd1303ab by yu

add the comments

parent 554744bd
......@@ -12,10 +12,16 @@
- **multi_dir**: description (ArrayList<String>, required)
- **mulit_tag**: description (ArrayList<String>, default: `[]`)
- **order_by_modified_time**: description (int, default: `0`)
- **order_by_creation_time**: description (int, default: `0`)
- **order**: description (String, default: `ALPHABETICAL`)
- **chunk_size**: description (int, default: `10485760(10M)`)
for the order option. There are many alternative:
ALPHABETICAL (default value)
ASCEND_MODIFIED
DESCEND_MODIFIED
ASCEND_CREATION (useless in the unix system, because the unix does not record the creation time)
DESCEND_CREATION (useless in the unix system)
## Example
```yaml
......@@ -25,7 +31,7 @@ in:
type: filename
mulit_dir: ["../sample/sample_","../example/example_"]
multi_tag: ["tag1","tag2"]
order_by_modified_time: 1
order: ASCEND_MODIFIED
chunk_size: 1000
```
Attention:
......@@ -53,15 +59,6 @@ sample2.txt
sample3.txt
For the order_by_modified_time option, its default value is 0, the files in each directory are uploaded in alphabetical order.
If it equals 1, the files in each directory will be uploaded in the order of their last modified time.
if it equals neither 1 or 0, the files will be uploaded in descend order of their last modified time.
The order_by_modified_time is the same like order_by_modified_time. Those two option could not be set at same time since the you have to choose
only one order to upload the files. And the order_by_creation_time is useless in Unix system as the unix system does not record the creation
time of the files. Use the order_by_creation_time sparingly.
## Build
java 1.8 is required.
......
/*
Author: CAI Yu
Email: icaiyu0618@gmail.com
This plugin is aimed for upload the files in multi directories for the Embulk.
To use the plugin read the ReadMe.md carefully.
This plugin will parse the directories in the multi_dir parameter in your config.yml
The embulk will load the files in each directory. And each directory will be treated as a task.
The embulk will run those tasks in multi threads.
This plugin should be used with the WendlinPlugin.
*/
package org.embulk.input.filename;
import java.util.List;
......@@ -18,11 +33,7 @@ import java.io.IOException;
import java.io.FileInputStream;
import java.io.ByteArrayOutputStream;
import com.google.common.base.Optional;
import org.apache.commons.codec.binary.Base64;
//import org.apache.commons.io.IOUtils;
//import com.google.common.base.Optional;
import org.slf4j.Logger;
......@@ -74,6 +85,7 @@ public class FilenameInputPlugin
String getLoadOrder();
// Not implements yet this Configuration will decide whether upload the symlinks files.
@Config("follow_symlinks")
@ConfigDefault("false")
boolean getFollowSymlinks();
......@@ -88,32 +100,53 @@ public class FilenameInputPlugin
}
// initialize a log. Very useful tool to display the important information.
private final Logger log = Exec.getLogger(getClass());
// Get the CURRENT directory of the embulk runing.
private final static Path CURRENT_DIR = Paths.get(".").normalize();
// This varibale record the tags for each directory
private static ArrayList<String> tagList;
private static ArrayList<String> lastPaths;
// This varibale decide the chunkSize of the upload data.
private static int chunkSize;
// You need to learn the workflow of the embulk before understand how this transaction work.
@Override
public ConfigDiff transaction(ConfigSource config,
InputPlugin.Control control)
{
// load the task from the TaskSource. from this varibale, we could read the configuration in the config.yml
PluginTask task = config.loadConfig(PluginTask.class);
// Get the data chunk size
chunkSize = task.getChunkSize();
// Read the directories list from the task.
ArrayList<String> dirList = task.getMultiDir();
// Read the LastPath list from the task
ArrayList<String> lastPaths = task.getLastPaths();
// We create a big Array to contains all directories, and each directories will contain many files.
ArrayList<ArrayList<String>> allFiles = new ArrayList<ArrayList<String>>();
// Read the tags list from the task.
tagList = task.getMultiTag();
// If the dirList have no directory, we throw an RuntimeException.
if ( dirList.size() != 0 ){
log.info ("The list of the directories: " + dirList );
// If the Number of tags is less than the directories, we say that the default tag for the directory is ""
while (tagList.size() < dirList.size()){
// If the Number of tags is less than the directories, we say that the default tag is ""
tagList.add("");
}
// if the number of lastPaths is less than the directory, we say the default lastPaths for the directory is ""
while (lastPaths.size() < dirList.size()){
lastPaths.add("");
}
......@@ -122,16 +155,20 @@ public class FilenameInputPlugin
}
// Now to time to read all files from each directory.
for (int i =0; i < dirList.size();i++ ){
String dir = dirList.get(i);
String lastPath = lastPaths.get(i);
// We have to sort the files before we set them to the tasks.
// Here we get the parameter about the order
String order = task.getLoadOrder();
if (order.equals("")){order = "ALPHABETICAL";}
ArrayList<String> files = listFiles(task,Paths.get(dir).normalize(),lastPath,order);
// Sort the files if each directory
// This method return the files in a directory,however the files in the varibale files is in random order. we have to sort them next
ArrayList<String> files = listFiles(task,Paths.get(dir).normalize(),lastPath,order);
// Sort the files if each directory
if (order.equals("ALPHABETICAL")){
Collections.sort(files);
} else if(order.equals("ASCEND_MODIFIED") || order.equals("DESCEND_MODIFIED")){
......@@ -169,29 +206,28 @@ public class FilenameInputPlugin
log.info("The files is " + files);
// add the files to the big Array.
allFiles.add(files);
}
// each directory will be treated as a task. the taskCount is the size of the bigArray.
int taskCount = allFiles.size();
log.info("taskCount of the input plugin is: " + taskCount);
// We set the allFiles to the tasks.files.
task.setFiles(allFiles);
// Here we add we columns for the Schema.
ArrayList<ColumnConfig> columns = new ArrayList<ColumnConfig>();
//final String columnName = task.getColumnName();
// Here we add two columns to the columns.
// if you want, we can read the columnName from the seed.yml: final String columnName = task.getColumnName();
columns.add(new ColumnConfig("payload", STRING, config));
columns.add(new ColumnConfig("tag", STRING, config));
// Create a Schema for the Page.
Schema schema = new SchemaConfig(columns).toSchema();
//Schema schema = task.getColumns().toSchema();
// number of run() method calls
log.info("TASKCOUNT " + taskCount);
return resume(task.dump(), schema, taskCount, control);
}
......@@ -211,6 +247,9 @@ public class FilenameInputPlugin
{
}
// This function will be run in every task. As we say that we distribute each directory as task.
// in this method. we have to load all files in the directory.
@Override
public TaskReport run(TaskSource taskSource,
Schema schema, int taskIndex,
......@@ -220,9 +259,12 @@ public class FilenameInputPlugin
ArrayList<String> files = task.getFiles().get(taskIndex);
// Check how many files in this directory on the run.
log.info("The files in the run:" + files);
// For each file, we create some pages.
// If the size of the file is less than chunkSize, we build just one page for this file.
// If the size of the file is more than chunkSize, we build more than one page for this file.
for (String file : files)
{
try
......@@ -232,6 +274,7 @@ public class FilenameInputPlugin
FileInputStream dataIn = new FileInputStream(file);
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
// Read the data and build the page.
while ((nRead = dataIn.read(data, 0, data.length)) != -1) {
buffer.write(data, 0, nRead);
try (PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output))
......@@ -259,9 +302,9 @@ public class FilenameInputPlugin
return Exec.newConfigDiff();
}
// This method is for walk through the directory and record the files in the directory. It will compare the filename with the lastPath
// In we want to upload the files in ALPHABETICAL order. than the filename "smaller than" the lastPath will be abandonned.
// Be careful, that since we have alternative for the order. You should be careful what "smaller than" means!
public ArrayList<String> listFiles(PluginTask task,Path pathPrefix,String lastPath,String order)
{
//Path pathPrefix = Paths.get(task.getPathPrefix()).normalize();
......
......@@ -38,7 +38,8 @@ import static org.junit.Assert.assertThat;
public class TestFilenameInputPlugin
{
// This method is to return the creationTime of the file
public static FileTime getCreationTime(String filename) throws IOException{
File file = new File(filename);
Path p = Paths.get(file.getAbsolutePath());
......@@ -46,7 +47,8 @@ public class TestFilenameInputPlugin
FileTime fileTime = view.creationTime();
return fileTime;
}
// This method is to return the Last Modified time of the file.
public static FileTime getLastModifiedTime(String filename) throws IOException{
File file = new File(filename);
Path p = Paths.get(file.getAbsolutePath());
......@@ -55,7 +57,10 @@ public class TestFilenameInputPlugin
return fileTime;
}
// In this test, we need to use the TestHelper which will simulate the embulk.
// To embulk run with the plugin we want. You need to register the plugin first
// (Just like you need to configure the plugin in the configur.yml)
//
@Rule
public TestHelper embulk = TestHelper.builder()
.registerPlugin(InputPlugin.class,"filename",FilenameInputPlugin.class)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment