embulk storages pre-installed to fix filename issue

parent 7c13154d
Embulk::JavaPlugin.register_input(
"ftp", "org.embulk.input.FtpFileInputPlugin",
File.expand_path('../../../../classpath', __FILE__))
Embulk::JavaPlugin.register_input(
"http", "org.embulk.input.http.HttpFileInputPlugin",
File.expand_path('../../../../classpath', __FILE__))
Embulk::JavaPlugin.register_input(
:s3, "org.embulk.input.s3.S3FileInputPlugin",
File.expand_path('../../../../classpath', __FILE__))
......@@ -18,7 +18,11 @@ module Embulk
task_reports = yield(task)
next_config_diff = {}
@logger = LogManager.instance()
@logger.info("Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE)
if task_reports.length > 0
@logger.info("Your ingested files will be available in the site in a few minutes. Thank for your patience.", print=TRUE)
else
@logger.info("No new files where processed for ingestion.", print=TRUE)
end
return next_config_diff
end
......
......@@ -20,10 +20,12 @@ module Embulk
class BinaryParserPlugin < ParserPlugin
Plugin.register_parser("binary", self)
METADATA_FILE_NAME = "/.metadata_file"
def self.transaction(config, &control)
tool_dir = config.param('tool_dir', :string, default: ".")
@logger = LogManager.instance()
@logger.setFilename(tool_dir, "parser")
@logger.setFilename(tool_dir, "ingestion")
task = {
chunk_size: config.param('chunk_size', :float, default: 0) * DatasetUtils::MEGA,
supplier: config.param("supplier", :string, default: "parser"),
......@@ -53,15 +55,31 @@ module Embulk
@logger = LogManager.instance()
while file = file_input.next_file
begin
filename = "file_from_#{task['input_plugin']}_#{task['date']}"
each_chunk(file, filename, task['chunk_size']) do |record|
metadata_file = Dir.pwd + METADATA_FILE_NAME
metadata = File.open(metadata_file) {|f| f.readline} if File.exist?(metadata_file)
File.delete(metadata_file) if File.exist?(metadata_file)
rescue Exception => e
@logger.error("An error occurred while getting file metadata: " + e.to_s)
@logger.error(e.backtrace)
end
begin
if metadata
extension = File.extname metadata
filename = metadata.reverse.sub(extension.reverse, "").reverse
extension.gsub! '.', ''
extension = extension == "" ? DatasetUtils::NONE_EXT : extension
else
filename = "file_from_#{task['input_plugin']}_#{task['date']}"
extension = @index.to_s.rjust(3, "0")
end
each_chunk(file, filename.chomp, extension.chomp, task['chunk_size']) do |record|
@page_builder.add(record)
end
@page_builder.finish
Index.instance().increase()
rescue java.lang.OutOfMemoryError
@logger.logOutOfMemoryError(path)
return
@logger.abortExecution()
rescue Exception => e
@logger.error("An error occurred during file ingestion: " + e.to_s, print=TRUE)
@logger.error(e.backtrace)
......@@ -71,8 +89,7 @@ module Embulk
end
private
def each_chunk(file, filename, chunk_size=DatasetUtils::CHUNK_SIZE)
extension = @index.to_s.rjust(3, "0")
def each_chunk(file, filename, extension, chunk_size=DatasetUtils::CHUNK_SIZE)
npart = 0
next_byte = file.read(1)
first = TRUE
......
package org.embulk.input;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import it.sauronsoftware.ftp4j.FTPAbortedException;
import it.sauronsoftware.ftp4j.FTPClient;
import it.sauronsoftware.ftp4j.FTPCommunicationListener;
import it.sauronsoftware.ftp4j.FTPConnector;
import it.sauronsoftware.ftp4j.FTPDataTransferException;
import it.sauronsoftware.ftp4j.FTPDataTransferListener;
import it.sauronsoftware.ftp4j.FTPException;
import it.sauronsoftware.ftp4j.FTPFile;
import it.sauronsoftware.ftp4j.FTPIllegalReplyException;
import it.sauronsoftware.ftp4j.FTPListParseException;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigInject;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.BufferAllocator;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
import org.embulk.spi.util.InputStreamFileInput;
import org.embulk.spi.util.ResumableInputStream;
import org.embulk.spi.util.RetryExecutor.RetryGiveupException;
import org.embulk.spi.util.RetryExecutor.Retryable;
import org.embulk.util.ftp.BlockingTransfer;
import org.embulk.util.ftp.SSLPlugins;
import org.embulk.util.ftp.SSLPlugins.SSLPluginConfig;
import org.slf4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.embulk.input.MetadataUtils;
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
public class FtpFileInputPlugin
implements FileInputPlugin
{
private final Logger log = Exec.getLogger(FtpFileInputPlugin.class);
private static final int FTP_DEFULAT_PORT = 21;
private static final int FTPS_DEFAULT_PORT = 990;
private static final int FTPES_DEFAULT_PORT = 21;
public interface PluginTask
extends Task, SSLPlugins.SSLPluginTask
{
@Config("path_prefix")
public String getPathPrefix();
@Config("last_path")
@ConfigDefault("null")
public Optional<String> getLastPath();
@Config("incremental")
@ConfigDefault("true")
public boolean getIncremental();
@Config("host")
public String getHost();
@Config("port")
@ConfigDefault("null")
public Optional<Integer> getPort();
@Config("user")
@ConfigDefault("null")
public Optional<String> getUser();
@Config("password")
@ConfigDefault("null")
public Optional<String> getPassword();
@Config("passive_mode")
@ConfigDefault("true")
public boolean getPassiveMode();
@Config("ascii_mode")
@ConfigDefault("false")
public boolean getAsciiMode();
@Config("ssl")
@ConfigDefault("false")
public boolean getSsl();
@Config("ssl_explicit")
@ConfigDefault("true")
public boolean getSslExplicit();
public List<String> getFiles();
public void setFiles(List<String> files);
public SSLPluginConfig getSSLConfig();
public void setSSLConfig(SSLPluginConfig config);
@ConfigInject
public BufferAllocator getBufferAllocator();
}
@Override
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
task.setSSLConfig(SSLPlugins.configure(task));
// list files recursively
List<String> files = listFiles(log, task);
task.setFiles(files);
log.info("Using files {}", files);
// TODO what if task.getFiles().isEmpty()?
// number of processors is same with number of files
return resume(task.dump(), task.getFiles().size(), control);
}
@Override
public ConfigDiff resume(TaskSource taskSource,
int taskCount,
FileInputPlugin.Control control)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
control.run(taskSource, taskCount);
// build next config
ConfigDiff configDiff = Exec.newConfigDiff();
// last_path
if (task.getIncremental()) {
if (task.getFiles().isEmpty()) {
// keep the last value
if (task.getLastPath().isPresent()) {
configDiff.set("last_path", task.getLastPath().get());
}
}
else {
List<String> files = new ArrayList<String>(task.getFiles());
Collections.sort(files);
configDiff.set("last_path", files.get(files.size() - 1));
}
}
return configDiff;
}
@Override
public void cleanup(TaskSource taskSource,
int taskCount,
List<TaskReport> successTaskReports)
{
// do nothing
}
private static FTPClient newFTPClient(Logger log, PluginTask task)
{
FTPClient client = new FTPClient();
try {
int defaultPort = FTP_DEFULAT_PORT;
if (task.getSsl()) {
client.setSSLSocketFactory(SSLPlugins.newSSLSocketFactory(task.getSSLConfig(), task.getHost()));
if (task.getSslExplicit()) {
client.setSecurity(FTPClient.SECURITY_FTPES);
defaultPort = FTPES_DEFAULT_PORT;
log.info("Using FTPES(FTPS/explicit) mode");
}
else {
client.setSecurity(FTPClient.SECURITY_FTPS);
defaultPort = FTPS_DEFAULT_PORT;
log.info("Using FTPS(FTPS/implicit) mode");
}
}
int port = task.getPort().isPresent() ? task.getPort().get() : defaultPort;
client.addCommunicationListener(new LoggingCommunicationListner(log));
// TODO configurable timeout parameters
client.setAutoNoopTimeout(3000);
FTPConnector con = client.getConnector();
con.setConnectionTimeout(30);
con.setReadTimeout(60);
con.setCloseTimeout(60);
// for commons-net client
//client.setControlKeepAliveTimeout
//client.setConnectTimeout
//client.setSoTimeout
//client.setDataTimeout
//client.setAutodetectUTF8
client.connect(task.getHost(), port);
log.info("Connecting to {}:{}", task.getHost(), port);
if (task.getUser().isPresent()) {
log.info("Logging in with user " + task.getUser().get());
client.login(task.getUser().get(), task.getPassword().or(""));
}
log.info("Using passive mode");
client.setPassive(task.getPassiveMode());
if (task.getAsciiMode()) {
log.info("Using ASCII mode");
client.setType(FTPClient.TYPE_TEXTUAL);
}
else {
log.info("Using binary mode");
client.setType(FTPClient.TYPE_BINARY);
}
if (client.isCompressionSupported()) {
log.info("Using MODE Z compression");
client.setCompressionEnabled(true);
}
FTPClient connected = client;
client = null;
return connected;
}
catch (FTPException ex) {
log.info("FTP command failed: " + ex.getCode() + " " + ex.getMessage());
throw Throwables.propagate(ex);
}
catch (FTPIllegalReplyException ex) {
log.info("FTP protocol error");
throw Throwables.propagate(ex);
}
catch (IOException ex) {
log.info("FTP network error: " + ex);
throw Throwables.propagate(ex);
}
finally {
if (client != null) {
disconnectClient(client);
}
}
}
static void disconnectClient(FTPClient client)
{
if (client.isConnected()) {
try {
client.disconnect(false);
}
catch (FTPException ex) {
// do nothing
}
catch (FTPIllegalReplyException ex) {
// do nothing
}
catch (IOException ex) {
// do nothing
}
}
}
private List<String> listFiles(Logger log, PluginTask task)
{
FTPClient client = newFTPClient(log, task);
try {
return listFilesByPrefix(log, client, task.getPathPrefix(), task.getLastPath());
}
finally {
disconnectClient(client);
}
}
public static List<String> listFilesByPrefix(Logger log, FTPClient client,
String prefix, Optional<String> lastPath)
{
String directory;
String fileNamePrefix;
if (prefix.isEmpty()) {
directory = "";
fileNamePrefix = "";
}
else {
int pos = prefix.lastIndexOf("/");
if (pos < 0) {
directory = "";
fileNamePrefix = prefix;
}
else {
directory = prefix.substring(0, pos + 1); // include last "/"
fileNamePrefix = prefix.substring(pos + 1);
}
}
ImmutableList.Builder<String> builder = ImmutableList.builder();
try {
String currentDirectory = client.currentDirectory();
log.info("Listing ftp files at directory '{}' filtering filename by prefix '{}'", directory.isEmpty() ? currentDirectory : directory, fileNamePrefix);
if (!directory.isEmpty()) {
client.changeDirectory(directory);
currentDirectory = directory;
}
for (FTPFile file : client.list()) {
if (file.getName().startsWith(fileNamePrefix)) {
listFilesRecursive(client, currentDirectory, file, lastPath, builder);
}
}
}
catch (FTPListParseException ex) {
log.info("FTP listing files failed");
throw Throwables.propagate(ex);
}
catch (FTPAbortedException ex) {
log.info("FTP listing files failed");
throw Throwables.propagate(ex);
}
catch (FTPDataTransferException ex) {
log.info("FTP data transfer failed");
throw Throwables.propagate(ex);
}
catch (FTPException ex) {
log.info("FTP command failed: " + ex.getCode() + " " + ex.getMessage());
throw Throwables.propagate(ex);
}
catch (FTPIllegalReplyException ex) {
log.info("FTP protocol error");
throw Throwables.propagate(ex);
}
catch (IOException ex) {
log.info("FTP network error: " + ex);
throw Throwables.propagate(ex);
}
return builder.build();
}
private static void listFilesRecursive(FTPClient client,
String baseDirectoryPath, FTPFile file, Optional<String> lastPath,
ImmutableList.Builder<String> builder)
throws IOException, FTPException, FTPIllegalReplyException, FTPDataTransferException, FTPAbortedException, FTPListParseException
{
if (!baseDirectoryPath.endsWith("/")) {
baseDirectoryPath = baseDirectoryPath + "/";
}
String path = baseDirectoryPath + file.getName();
if (lastPath.isPresent() && path.compareTo(lastPath.get()) <= 0) {
return;
}
switch (file.getType()) {
case FTPFile.TYPE_FILE:
builder.add(path);
break;
case FTPFile.TYPE_DIRECTORY:
client.changeDirectory(path);
for (FTPFile subFile : client.list()) {
listFilesRecursive(client, path, subFile, lastPath, builder);
}
client.changeDirectory(baseDirectoryPath);
break;
case FTPFile.TYPE_LINK:
// TODO
}
}
@Override
public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
return new FtpFileInput(log, task, taskIndex);
}
private static class LoggingCommunicationListner
implements FTPCommunicationListener
{
private final Logger log;
public LoggingCommunicationListner(Logger log)
{
this.log = log;
}
public void received(String statement)
{
log.info("< " + statement);
}
public void sent(String statement)
{
if (statement.startsWith("PASS")) {
// don't show password
return;
}
log.info("> " + statement);
}
}
private static class LoggingTransferListener
implements FTPDataTransferListener
{
private final Logger log;
private final long transferNoticeBytes;
private long totalTransfer;
private long nextTransferNotice;
public LoggingTransferListener(Logger log, long transferNoticeBytes)
{
this.log = log;
this.transferNoticeBytes = transferNoticeBytes;
this.nextTransferNotice = transferNoticeBytes;
}
public void started()
{
log.info("Transfer started");
}
public void transferred(int length)
{
totalTransfer += length;
if (totalTransfer > nextTransferNotice) {
log.info("Transferred " + totalTransfer + " bytes");
nextTransferNotice = ((totalTransfer / transferNoticeBytes) + 1) * transferNoticeBytes;
}
}
public void completed()
{
log.info("Transfer completed " + totalTransfer + " bytes");
}
public void aborted()
{
log.info("Transfer aborted");
}
public void failed()
{
log.info("Transfer failed");
}
}
private static final long TRANSFER_NOTICE_BYTES = 100 * 1024 * 1024;
private static InputStream startDownload(final Logger log, final FTPClient client,
final String path, final long offset, ExecutorService executor)
{
BlockingTransfer t = BlockingTransfer.submit(executor,
new Function<BlockingTransfer, Runnable>()
{
public Runnable apply(final BlockingTransfer transfer)
{
return new Runnable() {
public void run()
{
try {
client.download(path, Channels.newOutputStream(transfer.getWriterChannel()), offset, new LoggingTransferListener(log, TRANSFER_NOTICE_BYTES));
}
catch (FTPException ex) {
log.info("FTP command failed: " + ex.getCode() + " " + ex.getMessage());
throw Throwables.propagate(ex);
}
catch (FTPDataTransferException ex) {
log.info("FTP data transfer failed");
throw Throwables.propagate(ex);
}
catch (FTPAbortedException ex) {
log.info("FTP listing files failed");
throw Throwables.propagate(ex);
}
catch (FTPIllegalReplyException ex) {
log.info("FTP protocol error");
throw Throwables.propagate(ex);
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
finally {
try {
transfer.getWriterChannel().close();
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
};
}
});
String metadata = path;
metadata = metadata.startsWith("/") ? metadata.substring(1) : metadata;
try
{
MetadataUtils metadataOb = new MetadataUtils();
metadataOb.saveMetadata(metadata);
}
catch (Exception e)
{
log.info("[ERROR] Could not store metadata: " + metadata);
}
return Channels.newInputStream(t.getReaderChannel());
}
private static class FtpInputStreamReopener
implements ResumableInputStream.Reopener
{
private final Logger log;
private final FTPClient client;
private final ExecutorService executor;
private final String path;
public FtpInputStreamReopener(Logger log, FTPClient client, ExecutorService executor, String path)
{
this.log = log;
this.client = client;
this.executor = executor;
this.path = path;
}
@Override
public InputStream reopen(final long offset, final Exception closedCause) throws IOException
{
try {
return retryExecutor()
.withRetryLimit(3)
.withInitialRetryWait(500)
.withMaxRetryWait(30 * 1000)
.runInterruptible(new Retryable<InputStream>() {
@Override
public InputStream call() throws InterruptedIOException
{
log.warn(String.format("FTP read failed. Retrying GET request with %,d bytes offset", offset), closedCause);
return startDownload(log, client, path, offset, executor);
}
@Override
public boolean isRetryableException(Exception exception)
{
return true; // TODO
}
@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
throws RetryGiveupException
{
String message = String.format("FTP GET request failed. Retrying %d/%d after %d seconds. Message: %s",
retryCount, retryLimit, retryWait / 1000, exception.getMessage());
if (retryCount % 3 == 0) {
log.warn(message, exception);
}
else {
log.warn(message);
}
}
@Override
public void onGiveup(Exception firstException, Exception lastException)
throws RetryGiveupException
{
}
});
}
catch (RetryGiveupException ex) {
Throwables.propagateIfInstanceOf(ex.getCause(), IOException.class);
throw Throwables.propagate(ex.getCause());
}
catch (InterruptedException ex) {
throw new InterruptedIOException();
}
}
}
// TODO create single-file InputStreamFileInput utility
private static class SingleFileProvider
implements InputStreamFileInput.Provider
{
private final Logger log;
private final FTPClient client;
private final ExecutorService executor;
private final String path;
private boolean opened = false;
public SingleFileProvider(Logger log, PluginTask task, int taskIndex)
{
this.log = log;
this.client = newFTPClient(log, task);
this.executor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("embulk-input-ftp-transfer-%d")
.setDaemon(true)
.build());
this.path = task.getFiles().get(taskIndex);
}
@Override
public InputStream openNext() throws IOException
{
if (opened) {
return null;
}
opened = true;
return new ResumableInputStream(
startDownload(log, client, path, 0L, executor),
new FtpInputStreamReopener(log, client, executor, path));
}
@Override
public void close()
{
try {
executor.shutdownNow();
}
finally {
disconnectClient(client);
}
}
}
public static class FtpFileInput
extends InputStreamFileInput
implements TransactionalFileInput
{
public FtpFileInput(Logger log, PluginTask task, int taskIndex)
{
super(task.getBufferAllocator(), new SingleFileProvider(log, task, taskIndex));
}
public void abort()
{
}
public TaskReport commit()
{
return Exec.newTaskReport();
}
}
}
package org.embulk.input;
import java.io.PrintWriter;
public class MetadataUtils
{
public static final String METADATA_FILE_NAME = "/.metadata_file";
public void saveMetadata(String metadata)
{
try
{
String directory = System.getProperty("user.dir");
String metadata_file = directory.concat(METADATA_FILE_NAME);
PrintWriter writer = new PrintWriter(metadata_file, "UTF-8");
writer.println(metadata);
writer.close();
}
catch (Exception e)
{
System.out.println("[ERROR] Could not store metadata: " + metadata);
}
}
}
package org.embulk.input.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
public class BasicAuthOption
{
private final String user;
private final String password;
@JsonCreator
public BasicAuthOption(@JsonProperty("user") String user,
@JsonProperty("password") String password)
{
this.user = user;
this.password = password;
}
@JsonProperty("user")
public String getUser()
{
return user;
}
@JsonProperty("password")
public String getPassword()
{
return password;
}
@Override
public int hashCode()
{
return Objects.hashCode(user, password);
}
@Override
public String toString()
{
return String.format("BasicAuthOption[%s, %s]", getUser(), getPassword());
}
}
package org.embulk.input.http;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.NameValuePair;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicNameValuePair;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigInject;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.BufferAllocator;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
import org.embulk.spi.util.InputStreamFileInput;
import org.embulk.spi.util.RetryExecutor;
import org.slf4j.Logger;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.net.URI;
import org.embulk.input.MetadataUtils;
import static java.lang.String.format;
public class HttpFileInputPlugin implements FileInputPlugin
{
private final Logger logger = Exec.getLogger(getClass());
public interface PluginTask extends Task
{
@Config("url")
String getUrl();
@Config("charset")
@ConfigDefault("\"utf-8\"")
String getCharset();
@Config("method")
@ConfigDefault("\"get\"")
String getMethod();
@Config("user_agent")
@ConfigDefault("\"Embulk::Input::HttpFileInputPlugin\"")
String getUserAgent();
@Config("open_timeout")
@ConfigDefault("2000")
int getOpenTimeout();
@Config("read_timeout")
@ConfigDefault("10000")
int getReadTimeout();
@Config("max_retries")
@ConfigDefault("5")
int getMaxRetries();
@Config("retry_interval")
@ConfigDefault("10000")
int getRetryInterval();
@Config("request_interval")
@ConfigDefault("0")
int getRequestInterval();
void setRequestInterval(int requestInterval);
@Config("interval_includes_response_time")
@ConfigDefault("null")
boolean getIntervalIncludesResponseTime();
@Config("input_direct")
@ConfigDefault("true")
boolean getInputDirect();
@Config("params")
@ConfigDefault("null")
Optional<ParamsOption> getParams();
@Config("request_body")
@ConfigDefault("null")
Optional<String> getRequestBody();
@Config("basic_auth")
@ConfigDefault("null")
Optional<BasicAuthOption> getBasicAuth();
@Config("pager")
@ConfigDefault("null")
Optional<PagerOption> getPager();
@Config("request_headers")
@ConfigDefault("{}")
Map<String, String> getRequestHeaders();
@ConfigInject
BufferAllocator getBufferAllocator();
List<List<QueryOption.Query>> getQueries();
void setQueries(List<List<QueryOption.Query>> queries);
HttpMethod getHttpMethod();
void setHttpMethod(HttpMethod httpMethod);
}
public enum HttpMethod
{
POST,
GET
}
@Override
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
final int tasks;
if (task.getParams().isPresent()) {
List<List<QueryOption.Query>> queries = task.getParams().get().generateQueries(task.getPager());
task.setQueries(queries);
tasks = queries.size();
}
else if (task.getPager().isPresent()) {
List<List<QueryOption.Query>> queries = task.getPager().get().expand();
task.setQueries(queries);
tasks = queries.size();
}
else {
task.setQueries(Lists.<List<QueryOption.Query>>newArrayList());
task.setRequestInterval(0);
tasks = 1;
}
task.setHttpMethod(HttpMethod.valueOf(task.getMethod().toUpperCase()));
return resume(task.dump(), tasks, control);
}
@Override
public ConfigDiff resume(TaskSource taskSource, int taskCount, FileInputPlugin.Control control)
{
control.run(taskSource, taskCount);
return Exec.newConfigDiff();
}
@Override
public void cleanup(TaskSource taskSource, int taskCount, List<TaskReport> successTaskReports)
{
}
@Override
public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
HttpRequestBase request;
try {
request = makeRequest(task, taskIndex);
}
catch (URISyntaxException | UnsupportedEncodingException e) {
throw Throwables.propagate(e);
}
HttpClientBuilder builder = HttpClientBuilder.create()
.disableAutomaticRetries()
.setDefaultRequestConfig(makeRequestConfig(task))
.setDefaultHeaders(makeHeaders(task));
if (task.getBasicAuth().isPresent()) {
builder.setDefaultCredentialsProvider(makeCredentialsProvider(task.getBasicAuth().get(), request));
}
HttpClient client = builder.build();
logger.info(format(Locale.ENGLISH, "%s \"%s\"", task.getMethod().toUpperCase(), request.getURI().toString()));
RetryableHandler retryable = new RetryableHandler(client, request);
long startTimeMills = System.currentTimeMillis();
try {
RetryExecutor.retryExecutor().
withRetryLimit(task.getMaxRetries()).
withInitialRetryWait(task.getRetryInterval()).
withMaxRetryWait(30 * 60 * 1000).
runInterruptible(retryable);
InputStream stream = retryable.getResponse().getEntity().getContent();
if (!task.getInputDirect()) {
stream = copyToFile(stream);
}
try
{
URI uri = new URI(task.getUrl());
String metadata = uri.getHost() + uri.getPath();
MetadataUtils metadataOb = new MetadataUtils();
metadataOb.saveMetadata(metadata);
}
catch (Exception e)
{
System.out.println("[ERROR] Could not get/store metadata from url " + task.getUrl());
}
PluginFileInput input = new PluginFileInput(task, stream, startTimeMills);
stream = null;
return input;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
private InputStream copyToFile(InputStream input)
throws IOException
{
File tmpfile = Files.createTempFile("embulk-input-http.", ".tmp").toFile();
tmpfile.deleteOnExit();
try (FileOutputStream output = new FileOutputStream(tmpfile)) {
logger.info(format(Locale.ENGLISH, "Writing response to %s", tmpfile));
IOUtils.copy(input, output);
} finally {
input.close();
}
return new FileInputStream(tmpfile);
}
private CredentialsProvider makeCredentialsProvider(BasicAuthOption basicAuth, HttpRequestBase request)
{
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
final AuthScope authScope = new AuthScope(request.getURI().getHost(),
request.getURI().getPort());
credentialsProvider.setCredentials(authScope,
new UsernamePasswordCredentials(basicAuth.getUser(), basicAuth.getPassword()));
return credentialsProvider;
}
private HttpRequestBase makeRequest(PluginTask task, int taskIndex)
throws URISyntaxException, UnsupportedEncodingException
{
final List<QueryOption.Query> queries = (task.getQueries().isEmpty()) ?
null : task.getQueries().get(taskIndex);
if (task.getHttpMethod() == HttpMethod.GET) {
HttpGet request = new HttpGet(task.getUrl());
if (queries != null) {
URIBuilder builder = new URIBuilder(request.getURI());
for (QueryOption.Query q : queries) {
for (String v : q.getValues()) {
builder.addParameter(q.getName(), v);
}
}
request.setURI(builder.build());
}
return request;
}
if (task.getHttpMethod() == HttpMethod.POST) {
HttpPost request = new HttpPost(task.getUrl());
if (queries != null) {
List<NameValuePair> pairs = new ArrayList<>();
for (QueryOption.Query q : queries) {
for (String v : q.getValues()) {
pairs.add(new BasicNameValuePair(q.getName(), v));
}
}
request.setEntity(new UrlEncodedFormEntity(pairs));
}
else if (task.getRequestBody().isPresent()) {
logger.info(new StringEntity(task.getRequestBody().get()).toString());
request.setEntity(new StringEntity(task.getRequestBody().get()));
}
return request;
}
throw new IllegalArgumentException(String.format("Unsupported http method %s", task.getMethod()));
}
private List<Header> makeHeaders(PluginTask task)
{
List<Header> headers = new ArrayList<>();
headers.add(new BasicHeader("Accept", "*/*"));
headers.add(new BasicHeader("Accept-Charset", task.getCharset()));
headers.add(new BasicHeader("Accept-Encoding", "gzip, deflate"));
headers.add(new BasicHeader("Accept-Language", "en-us,en;q=0.5"));
headers.add(new BasicHeader("User-Agent", task.getUserAgent()));
for (Map.Entry<String, String> entry : task.getRequestHeaders().entrySet()) {
headers.add(new BasicHeader(entry.getKey(), entry.getValue()));
}
return headers;
}
private RequestConfig makeRequestConfig(PluginTask task)
{
return RequestConfig.custom()
.setCircularRedirectsAllowed(true)
.setMaxRedirects(10)
.setRedirectsEnabled(true)
.setConnectTimeout(task.getOpenTimeout())
.setSocketTimeout(task.getReadTimeout())
.build();
}
public static class PluginFileInput extends InputStreamFileInput
implements TransactionalFileInput
{
private final Logger logger = Exec.getLogger(getClass());
private final long startTimeMills;
private final PluginTask task;
public PluginFileInput(PluginTask task, InputStream stream, long startTimeMills)
{
super(task.getBufferAllocator(), new SingleFileProvider(stream));
this.startTimeMills = startTimeMills;
this.task = task;
}
public TaskReport commit()
{
return Exec.newTaskReport();
}
@Override
public void close()
{
super.close();
handleInterval();
}
@Override
public void abort()
{
}
protected void handleInterval()
{
if (task.getRequestInterval() <= 0) {
return;
}
long interval = task.getRequestInterval();
if (task.getIntervalIncludesResponseTime()) {
interval = interval - (System.currentTimeMillis() - startTimeMills);
}
if (interval > 0) {
logger.info(String.format("waiting %d msec ...", interval));
try {
Thread.sleep(interval);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
private static class SingleFileProvider
implements InputStreamFileInput.Provider
{
private final InputStream stream;
private boolean opened = false;
public SingleFileProvider(InputStream stream)
{
this.stream = stream;
}
@Override
public InputStream openNext() throws IOException
{
if (opened) {
return null;
}
opened = true;
return stream;
}
@Override
public void close() throws IOException
{
if (!opened) {
stream.close();
}
}
}
}
}
package org.embulk.input.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import java.util.ArrayList;
import java.util.List;
public class PagerOption
{
private final String fromParam;
private final Optional<String> toParam;
private final int start;
private final int pages;
private final int step;
@JsonCreator
public PagerOption(@JsonProperty("from_param") String fromParam,
@JsonProperty("to_param") Optional<String> toParam,
@JsonProperty("start") Optional<Integer> start,
@JsonProperty("pages") int pages,
@JsonProperty("step") Optional<Integer> step)
{
this.fromParam = fromParam;
this.toParam = toParam;
this.start = start.or(0);
this.pages = pages;
this.step = step.or(1);
}
public List<List<QueryOption.Query>> expand()
{
List<List<QueryOption.Query>> queries = new ArrayList<>();
int p = 1;
int index = start;
while (p <= pages) {
List<QueryOption.Query> one = new ArrayList<>();
one.add(new QueryOption.Query(fromParam, Integer.toString(index)));
if (toParam.isPresent()) {
int t = index + step - 1;
one.add(new QueryOption.Query(toParam.get(), Integer.toString(t)));
index = t + 1;
}
else {
index += step;
}
queries.add(one);
p++;
}
return queries;
}
@JsonProperty("from_param")
public String getFromParam()
{
return fromParam;
}
@JsonProperty("to_param")
public Optional<String> getToParam()
{
return toParam;
}
@JsonProperty("start")
public int getStart()
{
return start;
}
@JsonProperty("pages")
public int getPages()
{
return pages;
}
@JsonProperty("step")
public int getStep()
{
return step;
}
@Override
public String toString()
{
return "PagerOption{" +
"fromParam='" + fromParam + '\'' +
", toParam=" + toParam +
", start=" + start +
", pages=" + pages +
", step=" + step +
'}';
}
}
package org.embulk.input.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import java.util.ArrayList;
import java.util.List;
public class ParamsOption
{
private final List<QueryOption> queries;
@JsonCreator
public ParamsOption(List<QueryOption> queries)
{
this.queries = queries;
}
@JsonValue
public List<QueryOption> getQueries()
{
return queries;
}
public List<List<QueryOption.Query>> generateQueries(Optional<PagerOption> pagerOption)
{
List<List<QueryOption.Query>> base = new ArrayList<>(queries.size());
for (QueryOption p : queries) {
base.add(p.expand());
}
int productSize = 1;
int baseSize = base.size();
for (int i = 0; i < baseSize; productSize *= base.get(i).size(), i++) {
}
List<List<QueryOption.Query>> expands = new ArrayList<>(productSize);
for (int i = 0; i < productSize; i++) {
int j = 1;
List<QueryOption.Query> one = new ArrayList<>();
for (List<QueryOption.Query> list : base) {
QueryOption.Query pc = list.get((i / j) % list.size());
one.add(pc);
j *= list.size();
}
if (pagerOption.isPresent()) {
for (List<QueryOption.Query> q : pagerOption.get().expand()) {
expands.add(copyAndConcat(one, q));
}
}
else {
expands.add(one);
}
}
return expands;
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (!(obj instanceof ParamsOption)) {
return false;
}
ParamsOption other = (ParamsOption) obj;
return Objects.equal(queries, other.queries);
}
@Override
public int hashCode()
{
return Objects.hashCode(queries);
}
private List<QueryOption.Query> copyAndConcat(List<QueryOption.Query>... srcs)
{
List<QueryOption.Query> dest = new ArrayList<>();
for (List<QueryOption.Query> src : srcs) {
for (QueryOption.Query q : src) {
dest.add(q.copy());
}
}
return dest;
}
}
package org.embulk.input.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class QueryOption
{
private final String name;
private final Optional<String> value;
private final Optional<List<String>> values;
private final boolean expand;
@JsonCreator
public QueryOption(@JsonProperty("name") String name,
@JsonProperty("value") Optional<String> value,
@JsonProperty("values") Optional<List<String>> values,
@JsonProperty("expand") boolean expand)
{
this.name = name;
this.value = value;
this.values = values;
this.expand = expand;
}
public List<Query> expand()
{
List<Query> dest;
if (value.isPresent()) {
if (expand) {
List<String> expanded = BraceExpansion.expand(value.get());
dest = new ArrayList<>(expanded.size());
for (String s : expanded) {
dest.add(new Query(name, s));
}
}
else {
dest = new ArrayList<>(1);
dest.add(new Query(name, value.get()));
}
}
else if (values.isPresent()) {
if (expand) {
dest = new ArrayList<>(values.get().size());
for (String s : values.get()) {
dest.add(new Query(name, s));
}
}
else {
dest = new ArrayList<>(1);
final String[] valueArr = values.get().toArray(new String[values.get().size()]);
dest.add(new Query(name, valueArr));
}
}
else {
throw new IllegalArgumentException("value or values must be specified to 'params'");
}
return dest;
}
@JsonProperty("name")
public String getName()
{
return name;
}
@JsonProperty("value")
public Optional<String> getValue()
{
return value;
}
@JsonProperty("expand")
public boolean isExpand()
{
return expand;
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (!(obj instanceof QueryOption)) {
return false;
}
QueryOption other = (QueryOption) obj;
return Objects.equal(this.name, other.name) &&
Objects.equal(value, other.value) &&
Objects.equal(expand, other.expand);
}
@Override
public int hashCode()
{
return Objects.hashCode(name, value, expand);
}
@Override
public String toString()
{
return String.format("ParameterConfig[%s, %s, %s]",
getName(), getValue(), isExpand());
}
public static class Query
{
private final String name;
private final String[] values;
public Query(@JsonProperty("name") String name,
@JsonProperty("values") String... values)
{
this.name = name;
this.values = values;
}
public String getName()
{
return name;
}
public String[] getValues()
{
return values;
}
public Query copy()
{
return new Query(this.name, Arrays.copyOf(this.values, this.values.length));
}
}
private static class BraceExpansion
{
public static List<String> expand(String s)
{
return expandRecursive("", s, "", new ArrayList<String>());
}
private static List<String> expandRecursive(String prefix, String s,
String suffix, List<String> dest)
{
// used the code below as reference.
// http://rosettacode.org/wiki/Brace_expansion#Java
int i1 = -1;
int i2 = 0;
String noEscape = s.replaceAll("([\\\\]{2}|[\\\\][,}{])", " ");
StringBuilder sb = null;
outer:
while ((i1 = noEscape.indexOf('{', i1 + 1)) != -1) {
i2 = i1 + 1;
sb = new StringBuilder(s);
for (int depth = 1; i2 < s.length() && depth > 0; i2++) {
char c = noEscape.charAt(i2);
depth = (c == '{') ? ++depth : depth;
depth = (c == '}') ? --depth : depth;
if (c == ',' && depth == 1) {
sb.setCharAt(i2, '\u0000');
}
else if (c == '}' && depth == 0 && sb.indexOf("\u0000") != -1) {
break outer;
}
}
}
if (i1 == -1) {
if (suffix.length() > 0) {
expandRecursive(prefix + s, suffix, "", dest);
}
else {
final String out = String.format("%s%s%s", prefix, s, suffix).
replaceAll("[\\\\]{2}", "\\").replaceAll("[\\\\]([,}{])", "$1");
dest.add(out);
}
}
else {
for (String m : sb.substring(i1 + 1, i2).split("\u0000", -1)) {
expandRecursive(prefix + s.substring(0, i1), m, s.substring(i2 + 1) + suffix, dest);
}
}
return dest;
}
}
}
package org.embulk.input.http;
import com.google.common.collect.ImmutableList;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.util.EntityUtils;
import org.embulk.spi.Exec;
import org.embulk.spi.util.RetryExecutor;
import org.slf4j.Logger;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.UnknownHostException;
import java.util.List;
public class RetryableHandler implements RetryExecutor.Retryable
{
protected final Logger logger = Exec.getLogger(getClass());
private static final List<Class<? extends IOException>> NOT_RETRIABLE_CLASSES = ImmutableList.of(UnknownHostException.class,
InterruptedIOException.class, SSLException.class);
private final HttpClient client;
private final HttpRequestBase request;
private HttpResponse response;
public RetryableHandler(HttpClient client, HttpRequestBase request)
{
this.client = client;
this.request = request;
}
public HttpResponse getResponse()
{
return response;
}
@Override
public Object call() throws Exception
{
if (response != null) {
throw new IllegalStateException("response is already set");
}
HttpResponse response = client.execute(request);
statusIsOkOrThrow(response);
this.response = response;
return null;
}
@Override
public boolean isRetryableException(Exception exception)
{
if (NOT_RETRIABLE_CLASSES.contains(exception.getClass())) {
logger.error(String.format("'%s' is not retriable", exception.getClass()));
return false;
}
return true;
}
@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
throws RetryExecutor.RetryGiveupException
{
logger.warn("retrying {}/{} after {} seconds. Message: {}",
retryCount, retryLimit, retryWait / 1000,
exception.getMessage());
}
@Override
public void onGiveup(Exception firstException, Exception lastException)
throws RetryExecutor.RetryGiveupException
{
logger.error("giveup {}", lastException.getMessage());
}
protected void statusIsOkOrThrow(HttpResponse response)
throws HttpException, IOException
{
int code = response.getStatusLine().getStatusCode();
switch (response.getStatusLine().getStatusCode()) {
case 200:
return;
default:
throw new HttpException(String.format("Request is not successful, code=%d, body=%s",
code, EntityUtils.toString(response.getEntity())));
}
}
}
package org.embulk.input.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.retry.PredefinedRetryPolicies;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigInject;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.BufferAllocator;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
import org.embulk.spi.util.InputStreamFileInput;
import org.embulk.spi.util.ResumableInputStream;
import org.embulk.spi.util.RetryExecutor;
import org.embulk.util.aws.credentials.AwsCredentials;
import org.embulk.util.aws.credentials.AwsCredentialsTask;
import org.slf4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import org.embulk.input.MetadataUtils;
import static java.lang.String.format;
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
public abstract class AbstractS3FileInputPlugin
implements FileInputPlugin
{
private static final Logger LOGGER = Exec.getLogger(S3FileInputPlugin.class);
public interface PluginTask
extends AwsCredentialsTask, FileList.Task, RetrySupportPluginTask, Task
{
@Config("bucket")
public String getBucket();
@Config("path_prefix")
@ConfigDefault("null")
public Optional<String> getPathPrefix();
@Config("path")
@ConfigDefault("null")
public Optional<String> getPath();
@Config("last_path")
@ConfigDefault("null")
public Optional<String> getLastPath();
@Config("access_key_id")
@ConfigDefault("null")
public Optional<String> getAccessKeyId();
@Config("http_proxy")
@ConfigDefault("null")
public Optional<HttpProxy> getHttpProxy();
public void setHttpProxy(Optional<HttpProxy> httpProxy);
@Config("incremental")
@ConfigDefault("true")
public boolean getIncremental();
@Config("skip_glacier_objects")
@ConfigDefault("false")
public boolean getSkipGlacierObjects();
// TODO timeout, ssl, etc
public FileList getFiles();
public void setFiles(FileList files);
@ConfigInject
public BufferAllocator getBufferAllocator();
}
protected abstract Class<? extends PluginTask> getTaskClass();
@Override
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control)
{
PluginTask task = config.loadConfig(getTaskClass());
validateInputTask(task);
// list files recursively
task.setFiles(listFiles(task));
// number of processors is same with number of files
return resume(task.dump(), task.getFiles().getTaskCount(), control);
}
@Override
public ConfigDiff resume(TaskSource taskSource,
int taskCount,
FileInputPlugin.Control control)
{
PluginTask task = taskSource.loadTask(getTaskClass());
// validate task
newS3Client(task);
control.run(taskSource, taskCount);
// build next config
ConfigDiff configDiff = Exec.newConfigDiff();
// last_path
if (task.getIncremental()) {
Optional<String> lastPath = task.getFiles().getLastPath(task.getLastPath());
LOGGER.info("Incremental job, setting last_path to [{}]", lastPath.orNull());
configDiff.set("last_path", lastPath);
}
return configDiff;
}
@Override
public void cleanup(TaskSource taskSource,
int taskCount,
List<TaskReport> successTaskReports)
{
// do nothing
}
/**
* Provide an overridable default client.
* Since this returns an immutable object, it is not for any further customizations by mutating,
* e.g., {@link AmazonS3#setEndpoint} will throw a runtime {@link UnsupportedOperationException}
* Subclass's customization should be done through {@link AbstractS3FileInputPlugin#defaultS3ClientBuilder}.
* @param task Embulk plugin task
* @return AmazonS3
*/
protected AmazonS3 newS3Client(PluginTask task)
{
return defaultS3ClientBuilder(task).build();
}
/**
* A base builder for the subclasses to then customize.builder
* @param task Embulk plugin
* @return AmazonS3 client b
**/
protected AmazonS3ClientBuilder defaultS3ClientBuilder(PluginTask task)
{
return AmazonS3ClientBuilder
.standard()
.withCredentials(getCredentialsProvider(task))
.withClientConfiguration(getClientConfiguration(task));
}
protected AWSCredentialsProvider getCredentialsProvider(PluginTask task)
{
return AwsCredentials.getAWSCredentialsProvider(task);
}
protected ClientConfiguration getClientConfiguration(PluginTask task)
{
ClientConfiguration clientConfig = new ClientConfiguration();
/** PLT-9886: disable built-in retry*/
//clientConfig.setProtocol(Protocol.HTTP);
// clientConfig.setMaxConnections(50); // SDK default: 50
// clientConfig.setMaxErrorRetry(3); // SDK default: 3
// clientConfig.setSocketTimeout(8 * 60 * 1000); // SDK default: 50*1000
clientConfig.setRetryPolicy(PredefinedRetryPolicies.NO_RETRY_POLICY);
// set http proxy
if (task.getHttpProxy().isPresent()) {
setHttpProxyInAwsClient(clientConfig, task.getHttpProxy().get());
}
return clientConfig;
}
private void setHttpProxyInAwsClient(ClientConfiguration clientConfig, HttpProxy httpProxy)
{
// host
clientConfig.setProxyHost(httpProxy.getHost());
// port
if (httpProxy.getPort().isPresent()) {
clientConfig.setProxyPort(httpProxy.getPort().get());
}
// https
clientConfig.setProtocol(httpProxy.getHttps() ? Protocol.HTTPS : Protocol.HTTP);
// user
if (httpProxy.getUser().isPresent()) {
clientConfig.setProxyUsername(httpProxy.getUser().get());
}
// password
if (httpProxy.getPassword().isPresent()) {
clientConfig.setProxyPassword(httpProxy.getPassword().get());
}
}
/**
* Build the common retry executor from some configuration params of plugin task.
* @param task Plugin task.
* @return RetryExecutor object
*/
private static RetryExecutor retryExecutorFrom(RetrySupportPluginTask task)
{
return retryExecutor()
.withRetryLimit(task.getMaximumRetries())
.withInitialRetryWait(task.getInitialRetryIntervalMillis())
.withMaxRetryWait(task.getMaximumRetryIntervalMillis());
}
private FileList listFiles(final PluginTask task)
{
try {
AmazonS3 client = newS3Client(task);
String bucketName = task.getBucket();
FileList.Builder builder = new FileList.Builder(task);
RetryExecutor retryExec = retryExecutorFrom(task);
if (task.getPath().isPresent()) {
LOGGER.info("Start getting object with path: [{}]", task.getPath().get());
addS3DirectObject(builder, client, task.getBucket(), task.getPath().get(), retryExec);
}
else {
// does not need to verify existent path prefix here since there is the validation requires either path or path_prefix
LOGGER.info("Start listing file with prefix [{}]", task.getPathPrefix().get());
if (task.getPathPrefix().get().equals("/")) {
LOGGER.info("Listing files with prefix \"/\". This doesn't mean all files in a bucket. If you intend to read all files, use \"path_prefix: ''\" (empty string) instead.");
}
listS3FilesByPrefix(builder, client, bucketName,
task.getPathPrefix().get(), task.getLastPath(), task.getSkipGlacierObjects(), retryExec);
LOGGER.info("Found total [{}] files", builder.size());
}
return builder.build();
}
catch (AmazonServiceException ex) {
if (ex.getErrorType().equals(AmazonServiceException.ErrorType.Client)) {
// HTTP 40x errors. auth error, bucket doesn't exist, etc. See AWS document for the full list:
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if (ex.getStatusCode() != 400 // 404 Bad Request is unexpected error
|| "ExpiredToken".equalsIgnoreCase(ex.getErrorCode())) { // if statusCode == 400 && errorCode == ExpiredToken => throws ConfigException
throw new ConfigException(ex);
}
}
throw ex;
}
}
@VisibleForTesting
public void addS3DirectObject(FileList.Builder builder,
final AmazonS3 client,
String bucket,
String objectKey)
{
addS3DirectObject(builder, client, bucket, objectKey, null);
}
@VisibleForTesting
public void addS3DirectObject(FileList.Builder builder,
final AmazonS3 client,
String bucket,
String objectKey,
RetryExecutor retryExec)
{
final GetObjectMetadataRequest objectMetadataRequest = new GetObjectMetadataRequest(bucket, objectKey);
ObjectMetadata objectMetadata = new DefaultRetryable<ObjectMetadata>("Looking up for a single object") {
@Override
public ObjectMetadata call()
{
return client.getObjectMetadata(objectMetadataRequest);
}
}.executeWith(retryExec);
builder.add(objectKey, objectMetadata.getContentLength());
}
private void validateInputTask(PluginTask task)
{
if (!task.getPathPrefix().isPresent() && !task.getPath().isPresent()) {
throw new ConfigException("Either path or path_prefix is required");
}
}
@VisibleForTesting
public static void listS3FilesByPrefix(FileList.Builder builder,
final AmazonS3 client,
String bucketName,
String prefix,
Optional<String> lastPath,
boolean skipGlacierObjects)
{
listS3FilesByPrefix(builder, client, bucketName, prefix, lastPath, skipGlacierObjects, null);
}
/**
* Lists S3 filenames filtered by prefix.
* <p>
* The resulting list does not include the file that's size == 0.
* @param builder custom Filelist builder
* @param client Amazon S3
* @param bucketName Amazon S3 bucket name
* @param prefix Amazon S3 bucket name prefix
* @param lastPath last path
* @param skipGlacierObjects skip gracier objects
* @param retryExec a retry executor object to do the retrying
*/
@VisibleForTesting
public static void listS3FilesByPrefix(FileList.Builder builder,
final AmazonS3 client,
String bucketName,
String prefix,
Optional<String> lastPath,
boolean skipGlacierObjects,
RetryExecutor retryExec)
{
String lastKey = lastPath.orNull();
do {
final String finalLastKey = lastKey;
final ListObjectsRequest req = new ListObjectsRequest(bucketName, prefix, finalLastKey, null, 1024);
ObjectListing ol = new DefaultRetryable<ObjectListing>("Listing objects") {
@Override
public ObjectListing call()
{
return client.listObjects(req);
}
}.executeWith(retryExec);
for (S3ObjectSummary s : ol.getObjectSummaries()) {
if (s.getStorageClass().equals(StorageClass.Glacier.toString())) {
if (skipGlacierObjects) {
Exec.getLogger("AbstractS3FileInputPlugin.class").warn("Skipped \"s3://{}/{}\" that stored at Glacier.", bucketName, s.getKey());
continue;
}
else {
throw new ConfigException("Detected an object stored at Glacier. Set \"skip_glacier_objects\" option to \"true\" to skip this.");
}
}
if (s.getSize() > 0) {
builder.add(s.getKey(), s.getSize());
if (!builder.needsMore()) {
LOGGER.warn("Too many files matched, stop listing file");
return;
}
}
}
lastKey = ol.getNextMarker();
} while (lastKey != null);
}
@Override
public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
{
PluginTask task = taskSource.loadTask(getTaskClass());
return new S3FileInput(task, taskIndex);
}
@VisibleForTesting
static class S3InputStreamReopener
implements ResumableInputStream.Reopener
{
private final Logger log = Exec.getLogger(S3InputStreamReopener.class);
private final AmazonS3 client;
private final GetObjectRequest request;
private final long contentLength;
private final RetryExecutor retryExec;
public S3InputStreamReopener(AmazonS3 client, GetObjectRequest request, long contentLength)
{
this(client, request, contentLength, null);
}
public S3InputStreamReopener(AmazonS3 client, GetObjectRequest request, long contentLength, RetryExecutor retryExec)
{
this.client = client;
this.request = request;
this.contentLength = contentLength;
this.retryExec = retryExec;
}
@Override
public InputStream reopen(final long offset, final Exception closedCause) throws IOException
{
log.warn(format("S3 read failed. Retrying GET request with %,d bytes offset", offset), closedCause);
request.setRange(offset, contentLength - 1); // [first, last]
return new DefaultRetryable<S3ObjectInputStream>(format("Getting object '%s'", request.getKey())) {
@Override
public S3ObjectInputStream call()
{
return client.getObject(request).getObjectContent();
}
}.executeWithCheckedException(retryExec, IOException.class);
}
}
public class S3FileInput
extends InputStreamFileInput
implements TransactionalFileInput
{
public S3FileInput(PluginTask task, int taskIndex)
{
super(task.getBufferAllocator(), new SingleFileProvider(task, taskIndex));
}
public void abort()
{
}
public TaskReport commit()
{
return Exec.newTaskReport();
}
@Override
public void close()
{
}
}
// TODO create single-file InputStreamFileInput utility
private class SingleFileProvider
implements InputStreamFileInput.Provider
{
private AmazonS3 client;
private final String bucket;
private final Iterator<String> iterator;
private final RetryExecutor retryExec;
public SingleFileProvider(PluginTask task, int taskIndex)
{
this.client = newS3Client(task);
this.bucket = task.getBucket();
this.iterator = task.getFiles().get(taskIndex).iterator();
this.retryExec = retryExecutorFrom(task);
}
@Override
public InputStream openNext() throws IOException
{
if (!iterator.hasNext()) {
return null;
}
final String key = iterator.next();
final GetObjectRequest request = new GetObjectRequest(bucket, key);
S3Object object = new DefaultRetryable<S3Object>(format("Getting object '%s'", request.getKey())) {
@Override
public S3Object call()
{
return client.getObject(request);
}
}.executeWithCheckedException(retryExec, IOException.class);
long objectSize = object.getObjectMetadata().getContentLength();
LOGGER.info("Open S3Object with bucket [{}], key [{}], with size [{}]", bucket, key, objectSize);
String metadata = key;
try
{
MetadataUtils metadataOb = new MetadataUtils();
metadataOb.saveMetadata(metadata);
}
catch (Exception e)
{
LOGGER.warn("[ERROR] Could not store metadata: " + metadata);
}
return new ResumableInputStream(object.getObjectContent(), new S3InputStreamReopener(client, request, objectSize, retryExec));
}
@Override
public void close()
{
}
}
}
package org.embulk.input.s3;
import com.amazonaws.AmazonServiceException;
import com.google.common.base.Throwables;
import org.apache.http.HttpStatus;
import org.embulk.spi.Exec;
import org.embulk.spi.util.RetryExecutor;
import org.slf4j.Logger;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import static java.lang.String.format;
import static org.embulk.spi.util.RetryExecutor.RetryGiveupException;
import static org.embulk.spi.util.RetryExecutor.Retryable;
/**
* Retryable utility, regardless the occurred exceptions,
* Also provide a default approach for exception propagation.
*/
class DefaultRetryable<T> implements Retryable<T>
{
private static final Logger log = Exec.getLogger(DefaultRetryable.class);
private static final Set<Integer> NONRETRYABLE_STATUS_CODES = new HashSet<Integer>(2);
private static final Set<String> NONRETRYABLE_ERROR_CODES = new HashSet<String>(1);
private String operationName;
private Callable<T> callable;
static {
NONRETRYABLE_STATUS_CODES.add(HttpStatus.SC_FORBIDDEN);
NONRETRYABLE_STATUS_CODES.add(HttpStatus.SC_METHOD_NOT_ALLOWED);
NONRETRYABLE_ERROR_CODES.add("ExpiredToken");
}
/**
* @param operationName the name that will be referred on logging
*/
public DefaultRetryable(String operationName)
{
this.operationName = operationName;
}
/**
* @param operationName the name that will be referred on logging
* @param callable the operation, either define this at construction time or override the call() method
*/
public DefaultRetryable(String operationName, Callable<T> callable)
{
this.operationName = operationName;
this.callable = callable;
}
public DefaultRetryable()
{
this("Anonymous operation");
}
public DefaultRetryable(Callable<T> callable)
{
this("Anonymous operation", callable);
}
@Override
public T call() throws Exception
{
if (callable != null) {
return callable.call();
}
else {
throw new IllegalStateException("Either override call() or construct with a Runnable");
}
}
@Override
public boolean isRetryableException(Exception exception)
{
// No retry on a subset of service exceptions
if (exception instanceof AmazonServiceException) {
AmazonServiceException ase = (AmazonServiceException) exception;
return !NONRETRYABLE_STATUS_CODES.contains(ase.getStatusCode()) && !NONRETRYABLE_ERROR_CODES.contains(ase.getErrorCode());
}
return true;
}
@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
{
String message = format("%s failed. Retrying %d/%d after %d seconds. Message: %s",
operationName, retryCount, retryLimit, retryWait / 1000, exception.getMessage());
if (retryCount % retryLimit == 0) {
log.warn(message, exception);
}
else {
log.warn(message);
}
}
@Override
public void onGiveup(Exception firstException, Exception lastException)
{
// Exceptions would be propagated, so it's up to the caller to handle, this is just warning
log.warn("Giving up on retrying for {}, first exception is [{}], last exception is [{}]",
operationName, firstException.getMessage(), lastException.getMessage());
}
/**
* Run itself by the supplied executor,
*
* This propagates all exceptions (as unchecked) and unwrap RetryGiveupException for the original cause.
* If the original exception already is a RuntimeException, it will be propagated as is. If not, it will
* be wrapped around with a RuntimeException.
*
* For convenient, it execute normally without retrying when executor is null.
*
* @throws RuntimeException the original cause
*/
public T executeWith(RetryExecutor executor)
{
if (executor == null) {
try {
return this.call();
}
catch (Exception e) {
Throwables.propagate(e);
}
}
try {
return executor.runInterruptible(this);
}
catch (RetryGiveupException e) {
throw Throwables.propagate(e.getCause());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
/**
* Run itself by the supplied executor,
*
* Same as `executeWith`, this propagates all original exceptions. But `propagateAsIsException` will
* be re-throw without being wrapped on a RuntimeException, whether it is a checked or unchecked exception.
*
* For convenient, it execute normally without retrying when executor is null.
*
* @throws X whatever checked exception that you decided to propagate directly
* @throws RuntimeException wrap around whatever the original cause of failure (potentially thread interruption)
*/
public <X extends Throwable> T executeWithCheckedException(RetryExecutor executor,
Class<X> propagateAsIsException) throws X
{
if (executor == null) {
try {
return this.call();
}
catch (Exception e) {
Throwables.propagate(e);
}
}
try {
return executor.runInterruptible(this);
}
catch (RetryGiveupException e) {
Throwables.propagateIfInstanceOf(e.getCause(), propagateAsIsException);
throw Throwables.propagate(e.getCause());
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
package org.embulk.input.s3;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigSource;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
// this class should be moved to embulk-core
public class FileList
{
public interface Task
{
@Config("path_match_pattern")
@ConfigDefault("\".*\"")
String getPathMatchPattern();
@Config("total_file_count_limit")
@ConfigDefault("2147483647")
int getTotalFileCountLimit();
// TODO support more algorithms to combine tasks
@Config("min_task_size")
@ConfigDefault("0")
long getMinTaskSize();
}
public static class Entry
{
private int index;
private long size;
@JsonCreator
public Entry(
@JsonProperty("index") int index,
@JsonProperty("size") long size)
{
this.index = index;
this.size = size;
}
@JsonProperty("index")
public int getIndex()
{
return index;
}
@JsonProperty("size")
public long getSize()
{
return size;
}
}
public static class Builder
{
private final ByteArrayOutputStream binary;
private final OutputStream stream;
private final List<Entry> entries = new ArrayList<>();
private String last = null;
private int limitCount = Integer.MAX_VALUE;
private long minTaskSize = 1;
private Pattern pathMatchPattern;
private final ByteBuffer castBuffer = ByteBuffer.allocate(4);
public Builder(Task task)
{
this();
this.pathMatchPattern = Pattern.compile(task.getPathMatchPattern());
this.limitCount = task.getTotalFileCountLimit();
this.minTaskSize = task.getMinTaskSize();
}
public Builder(ConfigSource config)
{
this();
this.pathMatchPattern = Pattern.compile(config.get(String.class, "path_match_pattern", ".*"));
this.limitCount = config.get(int.class, "total_file_count_limit", Integer.MAX_VALUE);
this.minTaskSize = config.get(long.class, "min_task_size", 0L);
}
public Builder()
{
binary = new ByteArrayOutputStream();
try {
stream = new BufferedOutputStream(new GZIPOutputStream(binary));
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
public Builder limitTotalFileCount(int limitCount)
{
this.limitCount = limitCount;
return this;
}
public Builder minTaskSize(long bytes)
{
this.minTaskSize = bytes;
return this;
}
public Builder pathMatchPattern(String pattern)
{
this.pathMatchPattern = Pattern.compile(pattern);
return this;
}
public int size()
{
return entries.size();
}
public boolean needsMore()
{
return size() < limitCount;
}
// returns true if this file is used
public synchronized boolean add(String path, long size)
{
// TODO throw IllegalStateException if stream is already closed
if (!needsMore()) {
return false;
}
if (!pathMatchPattern.matcher(path).find()) {
return false;
}
int index = entries.size();
entries.add(new Entry(index, size));
byte[] data = path.getBytes(StandardCharsets.UTF_8);
castBuffer.putInt(0, data.length);
try {
stream.write(castBuffer.array());
stream.write(data);
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
last = path;
return true;
}
public FileList build()
{
try {
stream.close();
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
return new FileList(binary.toByteArray(), getSplits(entries), Optional.fromNullable(last));
}
private List<List<Entry>> getSplits(List<Entry> all)
{
List<List<Entry>> tasks = new ArrayList<>();
long currentTaskSize = 0;
List<Entry> currentTask = new ArrayList<>();
for (Entry entry : all) {
currentTask.add(entry);
currentTaskSize += entry.getSize(); // TODO consider to multiply the size by cost_per_byte, and add cost_per_file
if (currentTaskSize >= minTaskSize) {
tasks.add(currentTask);
currentTask = new ArrayList<>();
currentTaskSize = 0;
}
}
if (!currentTask.isEmpty()) {
tasks.add(currentTask);
}
return tasks;
}
}
private final byte[] data;
private final List<List<Entry>> tasks;
private final Optional<String> last;
@JsonCreator
@Deprecated
public FileList(
@JsonProperty("data") byte[] data,
@JsonProperty("tasks") List<List<Entry>> tasks,
@JsonProperty("last") Optional<String> last)
{
this.data = data;
this.tasks = tasks;
this.last = last;
}
@JsonIgnore
public Optional<String> getLastPath(Optional<String> lastLastPath)
{
if (last.isPresent()) {
return last;
}
return lastLastPath;
}
@JsonIgnore
public int getTaskCount()
{
return tasks.size();
}
@JsonIgnore
public List<String> get(int i)
{
return new EntryList(data, tasks.get(i));
}
@JsonProperty("data")
@Deprecated
public byte[] getData()
{
return data;
}
@JsonProperty("tasks")
@Deprecated
public List<List<Entry>> getTasks()
{
return tasks;
}
@JsonProperty("last")
@Deprecated
public Optional<String> getLast()
{
return last;
}
private class EntryList
extends AbstractList<String>
{
private final byte[] data;
private final List<Entry> entries;
private InputStream stream;
private int current;
private final ByteBuffer castBuffer = ByteBuffer.allocate(4);
public EntryList(byte[] data, List<Entry> entries)
{
this.data = data;
this.entries = entries;
try {
this.stream = new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(data)));
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
this.current = 0;
}
@Override
public synchronized String get(int i)
{
Entry e = entries.get(i);
if (e.getIndex() < current) {
// rewind to the head
try {
stream.close();
stream = new BufferedInputStream(new GZIPInputStream(new ByteArrayInputStream(data)));
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
current = 0;
}
while (current < e.getIndex()) {
readNext();
}
// now current == e.getIndex()
return readNextString();
}
@Override
public int size()
{
return entries.size();
}
private byte[] readNext()
{
try {
stream.read(castBuffer.array());
int n = castBuffer.getInt(0);
byte[] b = new byte[n]; // here should be able to use a pooled buffer because read data is ignored if readNextString doesn't call this method
stream.read(b);
current++;
return b;
}
catch (IOException ex) {
throw Throwables.propagate(ex);
}
}
private String readNextString()
{
return new String(readNext(), StandardCharsets.UTF_8);
}
}
}
package org.embulk.input.s3;
import com.google.common.base.Optional;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.Task;
/**
* HttpProxy is config unit for Input/Output plugins' configs.
*
* TODO: This unit will be moved to embulk/embulk-plugin-units.git.
* TODO: Consider using @JsonProperty(defaultValue=...) in Jackson 2.6+.
*/
public interface HttpProxy
extends Task
{
@Config("host")
public String getHost();
@Config("port")
@ConfigDefault("null")
public Optional<Integer> getPort();
@Config("https")
@ConfigDefault("true")
public boolean getHttps();
@Config("user")
@ConfigDefault("null")
public Optional<String> getUser();
@Config("password")
@ConfigDefault("null")
public Optional<String> getPassword();
}
package org.embulk.input.s3;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.Task;
public interface RetrySupportPluginTask extends Task
{
@Config("maximum_retries")
@ConfigDefault("7")
int getMaximumRetries();
@Config("initial_retry_interval_millis")
@ConfigDefault("30000")
int getInitialRetryIntervalMillis();
@Config("maximum_retry_interval_millis")
@ConfigDefault("480000")
int getMaximumRetryIntervalMillis();
}
package org.embulk.input.s3;
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.google.common.base.Optional;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.spi.Exec;
import org.slf4j.Logger;
public class S3FileInputPlugin
extends AbstractS3FileInputPlugin
{
public interface S3PluginTask
extends PluginTask
{
@Config("endpoint")
@ConfigDefault("null")
public Optional<String> getEndpoint();
@Config("region")
@ConfigDefault("null")
public Optional<String> getRegion();
}
private static final Logger log = Exec.getLogger(S3FileInputPlugin.class);
@Override
protected Class<? extends PluginTask> getTaskClass()
{
return S3PluginTask.class;
}
@Override
protected AmazonS3 newS3Client(PluginTask task)
{
S3PluginTask t = (S3PluginTask) task;
Optional<String> endpoint = t.getEndpoint();
Optional<String> region = t.getRegion();
AmazonS3ClientBuilder builder = super.defaultS3ClientBuilder(t);
// Favor the `endpoint` configuration, then `region`, if both are absent then `s3.amazonaws.com` will be used.
if (endpoint.isPresent()) {
if (region.isPresent()) {
log.warn("Either configure endpoint or region, " +
"if both is specified only the endpoint will be in effect.");
}
builder.setEndpointConfiguration(new EndpointConfiguration(endpoint.get(), null));
}
else if (region.isPresent()) {
builder.setRegion(region.get());
}
else {
// This is to keep the AWS SDK upgrading to 1.11.x to be backward compatible with old configuration.
//
// On SDK 1.10.x, when neither endpoint nor region is set explicitly, the client's endpoint will be by
// default `s3.amazonaws.com`. And for pre-Signature-V4, this will work fine as the bucket's region
// will be resolved to the appropriate region on server (AWS) side.
//
// On SDK 1.11.x, a region will be computed on client side by AwsRegionProvider and the endpoint now will
// be region-specific `<region>.s3.amazonaws.com` and might be the wrong one.
//
// So a default endpoint of `s3.amazonaws.com` when both endpoint and region configs are absent are
// necessary to make old configurations won't suddenly break. The side effect is that this will render
// AwsRegionProvider useless. And it's worth to note that Signature-V4 won't work with either versions with
// no explicit region or endpoint as the region (inferrable from endpoint) are necessary for signing.
builder.setEndpointConfiguration(new EndpointConfiguration("s3.amazonaws.com", null));
}
return builder.build();
}
}
package org.embulk.input;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.embulk.EmbulkTestRuntime;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.input.FtpFileInputPlugin.PluginTask;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.FileInputRunner;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.Schema;
import org.embulk.spi.TestPageBuilderReader;
import org.embulk.spi.TestPageBuilderReader.MockPageOutput;
import org.embulk.spi.util.Pages;
import org.embulk.standards.CsvParserPlugin;
import org.embulk.util.ftp.SSLPlugins;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
public class TestFtpFileInputPlugin
{
private static String FTP_TEST_HOST;
private static Integer FTP_TEST_PORT;
private static Integer FTP_TEST_SSL_PORT;
private static String FTP_TEST_USER;
private static String FTP_TEST_PASSWORD;
private static String FTP_TEST_SSL_TRUSTED_CA_CERT_FILE;
private static String FTP_TEST_SSL_TRUSTED_CA_CERT_DATA;
private static String FTP_TEST_DIRECTORY;
private static String FTP_TEST_PATH_PREFIX;
private FileInputRunner runner;
private TestPageBuilderReader.MockPageOutput output;
/*
* This test case requires environment variables
* FTP_TEST_HOST
* FTP_TEST_USER
* FTP_TEST_PASSWORD
* FTP_TEST_SSL_TRUSTED_CA_CERT_FILE
*/
@BeforeClass
public static void initializeConstant()
{
final Map<String, String> env = System.getenv();
FTP_TEST_HOST = env.getOrDefault("FTP_TEST_HOST", "localhost");
FTP_TEST_PORT = Integer.valueOf(env.getOrDefault("FTP_TEST_PORT", "11021"));
FTP_TEST_SSL_PORT = Integer.valueOf(env.getOrDefault("FTP_TEST_SSL_PORT", "990"));
FTP_TEST_USER = env.getOrDefault("FTP_TEST_USER", "scott");
FTP_TEST_PASSWORD = env.getOrDefault("FTP_TEST_PASSWORD", "tiger");
FTP_TEST_SSL_TRUSTED_CA_CERT_FILE = env.getOrDefault("FTP_TEST_SSL_TRUSTED_CA_CERT_FILE", "dummy");
FTP_TEST_SSL_TRUSTED_CA_CERT_DATA = env.getOrDefault("FTP_TEST_SSL_TRUSTED_CA_CERT_DATA", "dummy");
FTP_TEST_DIRECTORY = getDirectory(env.getOrDefault("FTP_TEST_DIRECTORY", "/unittest/"));
FTP_TEST_PATH_PREFIX = FTP_TEST_DIRECTORY + "sample_";
}
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
private FtpFileInputPlugin plugin;
@Before
public void createResources()
{
plugin = new FtpFileInputPlugin();
runner = new FileInputRunner(runtime.getInstance(FtpFileInputPlugin.class));
output = new MockPageOutput();
}
@Test(expected = RuntimeException.class) // TODO ConfigException should be thrown
public void testTransactionWithInvalidHost()
{
ConfigSource config = config().deepCopy()
.set("host", "non-exists.example.com");
runner.transaction(config, new Control());
}
@Test
public void testResume()
{
PluginTask task = config().loadConfig(PluginTask.class);
task.setSSLConfig(sslConfig(task));
task.setFiles(Arrays.asList("in/aa/a"));
ConfigDiff configDiff = plugin.resume(task.dump(), 0, new FileInputPlugin.Control()
{
@Override
public List<TaskReport> run(TaskSource taskSource, int taskCount)
{
return emptyTaskReports(taskCount);
}
});
assertThat(configDiff.get(String.class, "last_path"), is("in/aa/a"));
}
@Test
public void testCleanup()
{
PluginTask task = config().loadConfig(PluginTask.class);
plugin.cleanup(task.dump(), 0, Lists.<TaskReport>newArrayList()); // no errors happens
}
@Test
@SuppressWarnings("unchecked")
public void testListFilesWithNonExistPath() throws Exception
{
ConfigSource config = config().deepCopy()
.set("path_prefix", "non-exists-path");
PluginTask task = config.loadConfig(PluginTask.class);
plugin.transaction(config, new FileInputPlugin.Control() {
@Override
public List<TaskReport> run(TaskSource taskSource, int taskCount)
{
assertThat(taskCount, is(0));
return emptyTaskReports(taskCount);
}
});
Method method = FtpFileInputPlugin.class.getDeclaredMethod("listFiles", Logger.class, PluginTask.class);
method.setAccessible(true);
Logger logger = Exec.getLogger(FtpFileInputPlugin.class);
List<String> fileList = (List<String>) method.invoke(plugin, logger, task);
assertThat(fileList.size(), is(0));
}
@Test
@SuppressWarnings("unchecked")
public void testListFiles() throws Exception
{
List<String> expected = Arrays.asList(
FTP_TEST_PATH_PREFIX + "01.csv",
FTP_TEST_PATH_PREFIX + "02.csv"
);
ConfigSource config = config();
final PluginTask task = config.loadConfig(PluginTask.class);
ConfigDiff configDiff = plugin.transaction(config, new FileInputPlugin.Control() {
@Override
public List<TaskReport> run(TaskSource taskSource, int taskCount)
{
assertThat(taskCount, is(2));
return emptyTaskReports(taskCount);
}
});
Method method = FtpFileInputPlugin.class.getDeclaredMethod("listFiles", Logger.class, PluginTask.class);
method.setAccessible(true);
Logger logger = Exec.getLogger(FtpFileInputPlugin.class);
List<String> fileList = (List<String>) method.invoke(plugin, logger, task);
assertThat(fileList.get(0), is(expected.get(0)));
assertThat(fileList.get(1), is(expected.get(1)));
assertThat(configDiff.get(String.class, "last_path"), is(FTP_TEST_PATH_PREFIX + "02.csv"));
}
@Test
public void testListFilesByPrefixIncrementalFalse()
{
ConfigSource config = config()
.deepCopy()
.set("incremental", false);
ConfigDiff configDiff = runner.transaction(config, new Control());
assertThat(configDiff.toString(), is("{}"));
}
@Test
@SuppressWarnings("unchecked")
public void testGcsFileInputByOpen() throws Exception
{
ConfigSource config = config();
PluginTask task = config().loadConfig(PluginTask.class);
runner.transaction(config, new Control());
Method method = FtpFileInputPlugin.class.getDeclaredMethod("listFiles", Logger.class, PluginTask.class);
method.setAccessible(true);
Logger logger = Exec.getLogger(FtpFileInputPlugin.class);
List<String> fileList = (List<String>) method.invoke(plugin, logger, task);
task.setFiles(fileList);
assertRecords(config, output);
}
private static List<TaskReport> emptyTaskReports(int taskCount)
{
ImmutableList.Builder<TaskReport> reports = new ImmutableList.Builder<>();
for (int i = 0; i < taskCount; i++) {
reports.add(Exec.newTaskReport());
}
return reports.build();
}
private class Control
implements InputPlugin.Control
{
@Override
public List<TaskReport> run(TaskSource taskSource, Schema schema, int taskCount)
{
List<TaskReport> reports = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
reports.add(runner.run(taskSource, schema, i, output));
}
return reports;
}
}
private ConfigSource config()
{
return Exec.newConfigSource()
.set("host", FTP_TEST_HOST)
.set("port", FTP_TEST_PORT)
.set("user", FTP_TEST_USER)
.set("password", FTP_TEST_PASSWORD)
.set("path_prefix", FTP_TEST_PATH_PREFIX)
.set("last_path", "")
.set("file_ext", ".csv")
.set("max_connection_retry", 3)
.set("ssl", false)
.set("ssl_verify", false)
.set("ssl_verify_hostname", false)
.set("ssl_trusted_ca_cert_data", FTP_TEST_SSL_TRUSTED_CA_CERT_DATA)
.set("parser", parserConfig(schemaConfig()));
}
private ImmutableMap<String, Object> parserConfig(ImmutableList<Object> schemaConfig)
{
ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
builder.put("type", "csv");
builder.put("newline", "CRLF");
builder.put("delimiter", ",");
builder.put("quote", "\"");
builder.put("escape", "\"");
builder.put("trim_if_not_quoted", false);
builder.put("skip_header_lines", 1);
builder.put("allow_extra_columns", false);
builder.put("allow_optional_columns", false);
builder.put("columns", schemaConfig);
return builder.build();
}
private ImmutableList<Object> schemaConfig()
{
ImmutableList.Builder<Object> builder = new ImmutableList.Builder<>();
builder.add(ImmutableMap.of("name", "id", "type", "long"));
builder.add(ImmutableMap.of("name", "account", "type", "long"));
builder.add(ImmutableMap.of("name", "time", "type", "timestamp", "format", "%Y-%m-%d %H:%M:%S"));
builder.add(ImmutableMap.of("name", "purchase", "type", "timestamp", "format", "%Y%m%d"));
builder.add(ImmutableMap.of("name", "comment", "type", "string"));
return builder.build();
}
public SSLPlugins.SSLPluginConfig sslConfig(PluginTask task)
{
return SSLPlugins.configure(task);
}
private void assertRecords(ConfigSource config, MockPageOutput output)
{
List<Object[]> records = getRecords(config, output);
assertThat(records.size(), is(8));
{
Object[] record = records.get(0);
assertThat((long) record[0], is(1L));
assertThat((long) record[1], is(32864L));
assertThat(record[2].toString(), is("2015-01-27 19:23:49 UTC"));
assertThat(record[3].toString(), is("2015-01-27 00:00:00 UTC"));
assertThat(record[4].toString(), is("embulk"));
}
{
Object[] record = records.get(1);
assertThat((long) record[0], is(2L));
assertThat((long) record[1], is(14824L));
assertThat(record[2].toString(), is("2015-01-27 19:01:23 UTC"));
assertThat(record[3].toString(), is("2015-01-27 00:00:00 UTC"));
assertThat(record[4].toString(), is("embulk jruby"));
}
}
private List<Object[]> getRecords(ConfigSource config, MockPageOutput output)
{
Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema();
return Pages.toObjects(schema, output.pages);
}
private static String getDirectory(String dir)
{
if (dir != null && !dir.endsWith("/")) {
dir = dir + "/";
}
if (dir.startsWith("/")) {
dir = dir.replaceFirst("/", "");
}
return dir;
}
}
package org.embulk.input.http;
import com.google.common.base.Optional;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class TestPagerOption
{
@Test
public void testExpandFromTo() throws Exception
{
List<List<QueryOption.Query>> dest = new PagerOption("from", Optional.of("to"), Optional.of(1), 3,
Optional.of(2)).expand();
assertEquals(dest.size(), 3);
assertEquals(dest.get(0).size(), 2);
assertEquals(dest.get(0).get(0).getName(), "from");
assertEquals(dest.get(0).get(0).getValues()[0], "1");
assertEquals(dest.get(0).get(1).getName(), "to");
assertEquals(dest.get(0).get(1).getValues()[0], "2");
assertEquals(dest.get(1).size(), 2);
assertEquals(dest.get(1).get(0).getName(), "from");
assertEquals(dest.get(1).get(0).getValues()[0], "3");
assertEquals(dest.get(1).get(1).getName(), "to");
assertEquals(dest.get(1).get(1).getValues()[0], "4");
assertEquals(dest.get(2).size(), 2);
assertEquals(dest.get(2).get(0).getName(), "from");
assertEquals(dest.get(2).get(0).getValues()[0], "5");
assertEquals(dest.get(2).get(1).getName(), "to");
assertEquals(dest.get(2).get(1).getValues()[0], "6");
}
@Test
public void testExpandFromToWithDefault() throws Exception
{
Optional<Integer> nullValue = Optional.absent();
List<List<QueryOption.Query>> dest = new PagerOption("from", Optional.of("to"), nullValue, 2, nullValue)
.expand();
assertEquals(dest.size(), 2);
assertEquals(dest.get(0).size(), 2);
assertEquals(dest.get(0).get(0).getName(), "from");
assertEquals(dest.get(0).get(0).getValues()[0], "0");
assertEquals(dest.get(0).get(1).getName(), "to");
assertEquals(dest.get(0).get(1).getValues()[0], "0");
assertEquals(dest.get(1).size(), 2);
assertEquals(dest.get(1).get(0).getName(), "from");
assertEquals(dest.get(1).get(0).getValues()[0], "1");
assertEquals(dest.get(1).get(1).getName(), "to");
assertEquals(dest.get(1).get(1).getValues()[0], "1");
}
@Test
public void testExpandPagenate() throws Exception
{
Optional<String> nullValue = Optional.absent();
List<List<QueryOption.Query>> dest = new PagerOption("page", nullValue, Optional.of(1), 3,
Optional.of(1)).expand();
assertEquals(dest.size(), 3);
assertEquals(dest.get(0).size(), 1);
assertEquals(dest.get(0).get(0).getName(), "page");
assertEquals(dest.get(0).get(0).getValues()[0], "1");
assertEquals(dest.get(1).size(), 1);
assertEquals(dest.get(1).get(0).getName(), "page");
assertEquals(dest.get(1).get(0).getValues()[0], "2");
assertEquals(dest.get(2).size(), 1);
assertEquals(dest.get(2).get(0).getName(), "page");
assertEquals(dest.get(2).get(0).getValues()[0], "3");
}
}
package org.embulk.input.http;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class TestParamsOption
{
@Test
public void testUnexpandQueriesSinglePair() throws Exception
{
Optional<List<String>> nullValues = Optional.absent();
QueryOption q1 = new QueryOption("test1", Optional.of("awasome1"), nullValues, false);
QueryOption q2 = new QueryOption("test2", Optional.of("awasome2"), nullValues, false);
ParamsOption paramsOption = new ParamsOption(Lists.newArrayList(q1, q2));
Optional<PagerOption> pagerOption = Optional.absent();
List<List<QueryOption.Query>> dest = paramsOption.generateQueries(pagerOption);
assertEquals(dest.size(), 1);
assertEquals(dest.get(0).size(), 2);
assertEquals(dest.get(0).get(0).getName(), "test1");
assertEquals(dest.get(0).get(0).getValues()[0], "awasome1");
assertEquals(dest.get(0).get(1).getName(), "test2");
assertEquals(dest.get(0).get(1).getValues()[0], "awasome2");
}
@Test
public void testUnexpandQueriesExpandPair() throws Exception
{
Optional<String> nullValue = Optional.absent();
List<String> values1 = Lists.newArrayList("a", "b");
List<String> values2 = Lists.newArrayList("c", "d");
QueryOption q1 = new QueryOption("test1", nullValue, Optional.of(values1), false);
QueryOption q2 = new QueryOption("test2", nullValue, Optional.of(values2), false);
ParamsOption paramsOption = new ParamsOption(Lists.newArrayList(q1, q2));
Optional<PagerOption> pagerOption = Optional.absent();
List<List<QueryOption.Query>> dest = paramsOption.generateQueries(pagerOption);
assertEquals(dest.size(), 1);
assertEquals(dest.get(0).size(), 2);
assertEquals(dest.get(0).get(0).getName(), "test1");
assertEquals(dest.get(0).get(0).getValues()[0], "a");
assertEquals(dest.get(0).get(0).getValues()[1], "b");
assertEquals(dest.get(0).get(1).getName(), "test2");
assertEquals(dest.get(0).get(1).getValues()[0], "c");
assertEquals(dest.get(0).get(1).getValues()[1], "d");
}
@Test
public void testExpandQueriesSinglePair() throws Exception
{
Optional<List<String>> nullValues = Optional.absent();
QueryOption q1 = new QueryOption("test1", Optional.of("awasome1"), nullValues, true);
QueryOption q2 = new QueryOption("test2", Optional.of("awasome2"), nullValues, true);
ParamsOption paramsOption = new ParamsOption(Lists.newArrayList(q1, q2));
Optional<PagerOption> pagerOption = Optional.absent();
List<List<QueryOption.Query>> dest = paramsOption.generateQueries(pagerOption);
assertEquals(dest.size(), 1);
assertEquals(dest.get(0).size(), 2);
assertEquals(dest.get(0).get(0).getName(), "test1");
assertEquals(dest.get(0).get(0).getValues()[0], "awasome1");
assertEquals(dest.get(0).get(1).getName(), "test2");
assertEquals(dest.get(0).get(1).getValues()[0], "awasome2");
}
@Test
public void testExpandQueriesExpandPair() throws Exception
{
Optional<String> nullValue = Optional.absent();
List<String> values1 = Lists.newArrayList("a", "b");
List<String> values2 = Lists.newArrayList("c", "d");
QueryOption q1 = new QueryOption("test1", nullValue, Optional.of(values1), true);
QueryOption q2 = new QueryOption("test2", nullValue, Optional.of(values2), true);
ParamsOption paramsOption = new ParamsOption(Lists.newArrayList(q1, q2));
Optional<PagerOption> pagerOption = Optional.absent();
List<List<QueryOption.Query>> dest = paramsOption.generateQueries(pagerOption);
assertEquals(dest.size(), 4);
assertEquals(dest.get(0).size(), 2);
assertEquals(dest.get(0).get(0).getName(), "test1");
assertEquals(dest.get(0).get(0).getValues()[0], "a");
assertEquals(dest.get(0).get(1).getName(), "test2");
assertEquals(dest.get(0).get(1).getValues()[0], "c");
assertEquals(dest.get(1).size(), 2);
assertEquals(dest.get(1).get(0).getName(), "test1");
assertEquals(dest.get(1).get(0).getValues()[0], "b");
assertEquals(dest.get(1).get(1).getName(), "test2");
assertEquals(dest.get(1).get(1).getValues()[0], "c");
assertEquals(dest.get(2).size(), 2);
assertEquals(dest.get(2).get(0).getName(), "test1");
assertEquals(dest.get(2).get(0).getValues()[0], "a");
assertEquals(dest.get(2).get(1).getName(), "test2");
assertEquals(dest.get(2).get(1).getValues()[0], "d");
assertEquals(dest.get(3).size(), 2);
assertEquals(dest.get(3).get(0).getName(), "test1");
assertEquals(dest.get(3).get(0).getValues()[0], "b");
assertEquals(dest.get(3).get(1).getName(), "test2");
assertEquals(dest.get(3).get(1).getValues()[0], "d");
}
}
package org.embulk.input.http;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class TestQueryOption
{
@Test
public void testUnexpandSingleValue() throws Exception
{
Optional<List<String>> nullValues = Optional.absent();
QueryOption config = new QueryOption("test", Optional.of("awesome"), nullValues, false);
List<QueryOption.Query> dest = config.expand();
assertEquals(dest.size(), 1);
assertEquals(dest.get(0).getName(), "test");
assertEquals(dest.get(0).getValues().length, 1);
assertEquals(dest.get(0).getValues()[0], "awesome");
}
@Test
public void testUnexpandMultiValue() throws Exception
{
Optional<String> nullValue = Optional.absent();
List<String> values = Lists.newArrayList("a", "b", "c");
QueryOption config = new QueryOption("test", nullValue, Optional.of(values), false);
List<QueryOption.Query> dest = config.expand();
assertEquals(dest.size(), 1);
assertEquals(dest.get(0).getName(), "test");
assertEquals(dest.get(0).getValues().length, 3);
assertEquals(dest.get(0).getValues()[0], "a");
assertEquals(dest.get(0).getValues()[1], "b");
assertEquals(dest.get(0).getValues()[2], "c");
}
@Test
public void testExpandSingleValue() throws Exception
{
Optional<List<String>> nullValues = Optional.absent();
QueryOption config = new QueryOption("test", Optional.of("awesome"), nullValues, true);
List<QueryOption.Query> dest = config.expand();
assertEquals(dest.size(), 1);
assertEquals(dest.get(0).getName(), "test");
assertEquals(dest.get(0).getValues()[0], "awesome");
}
@Test
public void testExpandMultiValue() throws Exception
{
Optional<String> nullValue = Optional.absent();
List<String> values = Lists.newArrayList("a", "b", "c");
QueryOption config = new QueryOption("test", nullValue, Optional.of(values), true);
List<QueryOption.Query> dest = config.expand();
assertEquals(dest.size(), 3);
assertEquals(dest.get(0).getName(), "test");
assertEquals(dest.get(0).getValues().length, 1);
assertEquals(dest.get(0).getValues()[0], "a");
assertEquals(dest.get(1).getValues().length, 1);
assertEquals(dest.get(1).getName(), "test");
assertEquals(dest.get(1).getValues()[0], "b");
assertEquals(dest.get(2).getValues().length, 1);
assertEquals(dest.get(2).getName(), "test");
assertEquals(dest.get(2).getValues()[0], "c");
}
@Test(expected = IllegalArgumentException.class)
public void testExpandRaisesExceptionWhenBothValuesAreNull() throws Exception
{
Optional<List<String>> nullValues = Optional.absent();
Optional<String> nullValue = Optional.absent();
QueryOption config = new QueryOption("test", nullValue, nullValues, false);
config.expand();
}
@Test
public void testUnExpandBrace() throws Exception
{
Optional<List<String>> nullValues = Optional.absent();
QueryOption config = new QueryOption("test", Optional.of("{awesome1,awesome2,awesome3}"), nullValues, false);
List<QueryOption.Query> dest = config.expand();
assertEquals(dest.size(), 1);
assertEquals(dest.get(0).getName(), "test");
assertEquals(dest.get(0).getValues().length, 1);
assertEquals(dest.get(0).getValues()[0], "{awesome1,awesome2,awesome3}");
}
@Test
public void testExpandBrace() throws Exception
{
Optional<List<String>> nullValues = Optional.absent();
QueryOption config = new QueryOption("test", Optional.of("{awesome1,awesome2,awesome3}"), nullValues, true);
List<QueryOption.Query> dest = config.expand();
assertEquals(dest.size(), 3);
assertEquals(dest.get(0).getName(), "test");
assertEquals(dest.get(0).getValues().length, 1);
assertEquals(dest.get(0).getValues()[0], "awesome1");
assertEquals(dest.get(1).getName(), "test");
assertEquals(dest.get(1).getValues().length, 1);
assertEquals(dest.get(1).getValues()[0], "awesome2");
assertEquals(dest.get(2).getValues().length, 1);
assertEquals(dest.get(2).getName(), "test");
assertEquals(dest.get(2).getValues()[0], "awesome3");
}
@Test
public void testExpandEscapedBrace() throws Exception
{
Optional<List<String>> nullValues = Optional.absent();
QueryOption config = new QueryOption("test", Optional.of("{awe\\,some1,awes\\{ome2,awes\\}ome3}"), nullValues, true);
List<QueryOption.Query> dest = config.expand();
assertEquals(dest.get(0).getName(), "test");
assertEquals(dest.get(0).getValues().length, 1);
assertEquals(dest.get(0).getValues()[0], "awe,some1");
assertEquals(dest.get(1).getName(), "test");
assertEquals(dest.get(1).getValues().length, 1);
assertEquals(dest.get(1).getValues()[0], "awes{ome2");
assertEquals(dest.get(2).getName(), "test");
assertEquals(dest.get(2).getValues().length, 1);
assertEquals(dest.get(2).getValues()[0], "awes}ome3");
}
}
package org.embulk.input.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.google.common.base.Optional;
import org.apache.http.HttpStatus;
import org.embulk.EmbulkTestRuntime;
import org.embulk.spi.util.RetryExecutor;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
public class TestAbstractS3FileInputPlugin
{
private static RetryExecutor retryExecutor()
{
return RetryExecutor.retryExecutor()
.withInitialRetryWait(0)
.withMaxRetryWait(0);
}
private static AbstractS3FileInputPlugin dummyS3Plugin()
{
return new AbstractS3FileInputPlugin()
{
@Override
protected Class<? extends PluginTask> getTaskClass()
{
return PluginTask.class;
}
};
}
private static class SomeException extends RuntimeException
{
}
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
private AmazonS3 client;
@Before
public void createResources()
{
client = mock(AmazonS3.class);
}
@Test
public void listS3FilesByPrefix()
{
doReturn(new ObjectListing()).when(client).listObjects(any(ListObjectsRequest.class));
FileList.Builder builder = new FileList.Builder();
dummyS3Plugin().listS3FilesByPrefix(builder, client, "some_bucket", "some_prefix", Optional.of("last_path"), true);
}
@Test
public void listS3FileByPrefix_with_retry()
{
doThrow(new RuntimeException()).doReturn(new ObjectListing())
.when(client).listObjects(any(ListObjectsRequest.class));
FileList.Builder builder = new FileList.Builder();
dummyS3Plugin().listS3FilesByPrefix(
builder, client, "some_bucket", "some_prefix", Optional.of("last_path"), true,
retryExecutor().withRetryLimit(1));
}
@Test(expected = SomeException.class)
public void listS3FileByPrefix_on_retry_gave_up_should_throw_the_original_exception()
{
doThrow(new SomeException()).doReturn(new ObjectListing())
.when(client).listObjects(any(ListObjectsRequest.class));
FileList.Builder builder = new FileList.Builder();
dummyS3Plugin().listS3FilesByPrefix(
builder, client, "some_bucket", "some_prefix", Optional.of("last_path"), true,
retryExecutor().withRetryLimit(0));
}
@Test(expected = AmazonServiceException.class)
public void listS3FileByPrefix_on_retry_gave_up_should_throw_the_original_exception_in_forbidden_code()
{
AmazonServiceException exception = new AmazonServiceException("Forbidden exception");
exception.setStatusCode(HttpStatus.SC_FORBIDDEN);
exception.setErrorType(AmazonServiceException.ErrorType.Client);
doThrow(exception).doReturn(new ObjectListing())
.when(client).listObjects(any(ListObjectsRequest.class));
FileList.Builder builder = new FileList.Builder();
dummyS3Plugin().listS3FilesByPrefix(
builder, client, "some_bucket", "some_prefix", Optional.of("last_path"), true,
retryExecutor().withRetryLimit(1));
}
@Test(expected = AmazonServiceException.class)
public void listS3FileByPrefix_on_retry_gave_up_should_throw_the_original_exception_in_methodnotallow_code()
{
AmazonServiceException exception = new AmazonServiceException("method not allow exception");
exception.setStatusCode(HttpStatus.SC_METHOD_NOT_ALLOWED);
exception.setErrorType(AmazonServiceException.ErrorType.Client);
doThrow(exception).doReturn(new ObjectListing())
.when(client).listObjects(any(ListObjectsRequest.class));
FileList.Builder builder = new FileList.Builder();
dummyS3Plugin().listS3FilesByPrefix(
builder, client, "some_bucket", "some_prefix", Optional.of("last_path"), true,
retryExecutor().withRetryLimit(1));
}
@Test(expected = AmazonServiceException.class)
public void listS3FileByPrefix_on_retry_gave_up_should_throw_the_original_exception_in_expiredToken_code()
{
AmazonServiceException exception = new AmazonServiceException("expired token exception");
exception.setStatusCode(HttpStatus.SC_BAD_REQUEST);
exception.setErrorCode("ExpiredToken");
exception.setErrorType(AmazonServiceException.ErrorType.Client);
doThrow(exception).doReturn(new ObjectListing())
.when(client).listObjects(any(ListObjectsRequest.class));
FileList.Builder builder = new FileList.Builder();
dummyS3Plugin().listS3FilesByPrefix(
builder, client, "some_bucket", "some_prefix", Optional.of("last_path"), true,
retryExecutor().withRetryLimit(1));
}
@Test
public void addS3DirectObject()
{
doReturn(new ObjectMetadata()).when(client).getObjectMetadata(any(GetObjectMetadataRequest.class));
FileList.Builder builder = new FileList.Builder().pathMatchPattern("");
dummyS3Plugin().addS3DirectObject(builder, client, "some_bucket", "some_prefix");
}
@Test
public void addS3DirectObject_with_retry()
{
doThrow(new RuntimeException()).doReturn(new ObjectMetadata())
.when(client).getObjectMetadata(any(GetObjectMetadataRequest.class));
FileList.Builder builder = new FileList.Builder().pathMatchPattern("");
dummyS3Plugin().addS3DirectObject(
builder, client, "some_bucket", "some_prefix",
retryExecutor());
}
@Test(expected = SomeException.class)
public void addS3DirectObject_on_retry_gave_up_should_throw_original_exception()
{
doThrow(new SomeException()).doReturn(new ObjectMetadata())
.when(client).getObjectMetadata(any(GetObjectMetadataRequest.class));
FileList.Builder builder = new FileList.Builder().pathMatchPattern("");
dummyS3Plugin().addS3DirectObject(
builder, client, "some_bucket", "some_prefix",
retryExecutor().withRetryLimit(0));
}
}
package org.embulk.input.s3;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.auth.policy.Policy;
import com.amazonaws.auth.policy.Resource;
import com.amazonaws.auth.policy.Statement;
import com.amazonaws.auth.policy.actions.S3Actions;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.services.securitytoken.model.GetFederationTokenRequest;
import com.amazonaws.services.securitytoken.model.GetFederationTokenResult;
import org.embulk.EmbulkTestRuntime;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.input.s3.TestS3FileInputPlugin.Control;
import org.embulk.spi.FileInputRunner;
import org.embulk.spi.TestPageBuilderReader;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import static org.embulk.input.s3.TestS3FileInputPlugin.assertRecords;
import static org.embulk.input.s3.TestS3FileInputPlugin.parserConfig;
import static org.embulk.input.s3.TestS3FileInputPlugin.schemaConfig;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeNotNull;
public class TestAwsCredentials
{
private static String EMBULK_S3_TEST_BUCKET;
private static String EMBULK_S3_TEST_ACCESS_KEY_ID;
private static String EMBULK_S3_TEST_SECRET_ACCESS_KEY;
private static final String EMBULK_S3_TEST_PATH_PREFIX = "embulk_input_s3_test";
/*
* This test case requires environment variables:
* EMBULK_S3_TEST_BUCKET
* EMBULK_S3_TEST_ACCESS_KEY_ID
* EMBULK_S3_TEST_SECRET_ACCESS_KEY
* If the variables not set, the test case is skipped.
*/
@BeforeClass
public static void initializeConstantVariables()
{
EMBULK_S3_TEST_BUCKET = System.getenv("EMBULK_S3_TEST_BUCKET");
EMBULK_S3_TEST_ACCESS_KEY_ID = System.getenv("EMBULK_S3_TEST_ACCESS_KEY_ID");
EMBULK_S3_TEST_SECRET_ACCESS_KEY = System.getenv("EMBULK_S3_TEST_SECRET_ACCESS_KEY");
assumeNotNull(EMBULK_S3_TEST_BUCKET, EMBULK_S3_TEST_ACCESS_KEY_ID, EMBULK_S3_TEST_SECRET_ACCESS_KEY);
}
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
private ConfigSource config;
private FileInputRunner runner;
private TestPageBuilderReader.MockPageOutput output;
@Before
public void createResources()
{
config = runtime.getExec().newConfigSource()
.set("type", "s3")
.set("bucket", EMBULK_S3_TEST_BUCKET)
.set("path_prefix", EMBULK_S3_TEST_PATH_PREFIX)
.set("parser", parserConfig(schemaConfig()));
runner = new FileInputRunner(runtime.getInstance(S3FileInputPlugin.class));
output = new TestPageBuilderReader.MockPageOutput();
}
private void doTest(ConfigSource config)
{
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path"));
assertRecords(config, output);
}
@Test
public void useBasic()
{
ConfigSource config = this.config.deepCopy()
.set("auth_method", "basic")
.set("access_key_id", EMBULK_S3_TEST_ACCESS_KEY_ID)
.set("secret_access_key", EMBULK_S3_TEST_SECRET_ACCESS_KEY);
doTest(config);
}
@Test
public void useEnv()
{
// TODO
}
@Test
public void useInstance()
{
// TODO
}
@Test
public void useProfile()
{
// TODO
}
@Test
public void useProperties()
{
String origAccessKeyId = System.getProperty("aws.accessKeyId");
String origSecretKey = System.getProperty("aws.secretKey");
try {
ConfigSource config = this.config.deepCopy().set("auth_method", "properties");
System.setProperty("aws.accessKeyId", EMBULK_S3_TEST_ACCESS_KEY_ID);
System.setProperty("aws.secretKey", EMBULK_S3_TEST_SECRET_ACCESS_KEY);
doTest(config);
}
finally {
if (origAccessKeyId != null) {
System.setProperty("aws.accessKeyId", origAccessKeyId);
}
if (origSecretKey != null) {
System.setProperty("aws.secretKey", origAccessKeyId);
}
}
}
@Test
public void useAnonymous()
{
// TODO
}
@Test
public void useSession()
{
BasicSessionCredentials sessionCredentials = getSessionCredentials();
ConfigSource config = this.config.deepCopy()
.set("auth_method", "session")
.set("access_key_id", sessionCredentials.getAWSAccessKeyId())
.set("secret_access_key", sessionCredentials.getAWSSecretKey())
.set("session_token", sessionCredentials.getSessionToken());
doTest(config);
}
private static BasicSessionCredentials getSessionCredentials()
{
AWSSecurityTokenService stsClient = AWSSecurityTokenServiceClientBuilder.standard().withCredentials(
new AWSStaticCredentialsProvider(new BasicAWSCredentials(EMBULK_S3_TEST_ACCESS_KEY_ID, EMBULK_S3_TEST_SECRET_ACCESS_KEY))
).build();
GetFederationTokenRequest getFederationTokenRequest = new GetFederationTokenRequest();
getFederationTokenRequest.setDurationSeconds(7200);
getFederationTokenRequest.setName("dummy");
Policy policy = new Policy().withStatements(new Statement(Statement.Effect.Allow)
.withActions(S3Actions.ListObjects, S3Actions.GetObject)
.withResources(
new Resource("arn:aws:s3:::" + EMBULK_S3_TEST_BUCKET + "/" + EMBULK_S3_TEST_PATH_PREFIX + "/*"),
new Resource("arn:aws:s3:::" + EMBULK_S3_TEST_BUCKET)));
getFederationTokenRequest.setPolicy(policy.toJson());
GetFederationTokenResult federationTokenResult = stsClient.getFederationToken(getFederationTokenRequest);
Credentials sessionCredentials = federationTokenResult.getCredentials();
return new BasicSessionCredentials(
sessionCredentials.getAccessKeyId(),
sessionCredentials.getSecretAccessKey(),
sessionCredentials.getSessionToken());
}
}
package org.embulk.input.s3;
import org.embulk.EmbulkTestRuntime;
import org.embulk.spi.util.RetryExecutor;
import org.embulk.spi.util.RetryExecutor.RetryGiveupException;
import org.junit.Rule;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.Callable;
import static java.lang.String.format;
import static org.msgpack.core.Preconditions.checkArgument;
public class TestDefaultRetryable
{
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); // require for DefaultRetryable's logger
private static class Deny extends RuntimeException implements Callable
{
private int pastCalls = 0;
private final int targetCalls;
private Exception exception;
Deny(int targetCalls)
{
super(format("Try harder! (Will pass after %d calls)", targetCalls));
checkArgument(targetCalls >= 0);
this.targetCalls = targetCalls;
}
static Deny until(int calls)
{
return new Deny(calls);
}
Deny with(Exception exception)
{
this.exception = exception;
return this;
}
@Override
public Object call() throws Exception
{
if (pastCalls < targetCalls) {
pastCalls++;
if (exception != null) {
throw exception;
}
else {
throw this;
}
}
pastCalls++;
return null;
}
}
private static RetryExecutor retryExecutor()
{
return RetryExecutor.retryExecutor()
.withInitialRetryWait(0)
.withMaxRetryWait(0);
}
@Test
@SuppressWarnings("unchecked")
public void guarantee_retry_attempts_just_like_Retryable() throws Exception
{
retryExecutor()
.withRetryLimit(0)
.run(new DefaultRetryable(Deny.until(0)));
retryExecutor()
.withRetryLimit(1)
.run(new DefaultRetryable(Deny.until(1)));
retryExecutor()
.withRetryLimit(2)
.run(new DefaultRetryable(Deny.until(1)));
retryExecutor()
.withRetryLimit(3)
.run(new DefaultRetryable(Deny.until(2)));
}
@Test(expected = RetryGiveupException.class)
@SuppressWarnings("unchecked")
public void fail_after_exceeding_attempts_just_like_Retryable() throws Exception
{
retryExecutor()
.withRetryLimit(3)
.run(new DefaultRetryable(Deny.until(4)));
}
@Test(expected = Deny.class)
@SuppressWarnings("unchecked")
public void execute_should_unwrap_RetryGiveupException() throws Exception
{
new DefaultRetryable(Deny.until(4))
.executeWith(retryExecutor().withRetryLimit(3));
}
@Test(expected = RuntimeException.class)
@SuppressWarnings("unchecked")
public void execute_should_unwrap_RetryGiveupException_but_rewrap_checked_exception_in_a_RuntimeException()
{
new DefaultRetryable(Deny.until(4).with(new Exception("A checked exception")))
.executeWith(retryExecutor().withRetryLimit(3));
}
@Test(expected = IOException.class)
public void executeAndPropagateAsIs_should_leave_original_exception_unwrapped() throws IOException
{
RetryExecutor retryExc = retryExecutor().withRetryLimit(3);
// An explicit type parameter for operation return type is needed here,
// Without one, javac (at least on 1.8) will fails to infer the X exception type parameter.
new DefaultRetryable<Object>() {
@Override
public Object call() throws IOException
{
throw new IOException();
}
}.executeWithCheckedException(retryExc, IOException.class);
}
@Test(expected = IllegalStateException.class)
public void execute_without_an_implementation_should_throw_an_IllegalStateException()
{
new DefaultRetryable().executeWith(retryExecutor());
}
}
package org.embulk.input.s3;
import org.embulk.EmbulkTestRuntime;
import org.embulk.config.ConfigSource;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class TestFileList
{
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
private ConfigSource config;
@Before
public void createConfigSource()
{
config = runtime.getExec().newConfigSource();
}
@Test
public void checkMinTaskSize()
throws Exception
{
{ // not specify min_task_size
FileList fileList = newFileList(config.deepCopy(),
"sample_00", 100L,
"sample_01", 150L,
"sample_02", 350L);
assertEquals(3, fileList.getTaskCount());
assertEquals("sample_00", fileList.get(0).get(0));
assertEquals("sample_01", fileList.get(1).get(0));
assertEquals("sample_02", fileList.get(2).get(0));
}
{
FileList fileList = newFileList(config.deepCopy().set("min_task_size", 100),
"sample_00", 100L,
"sample_01", 150L,
"sample_02", 350L);
assertEquals(3, fileList.getTaskCount());
assertEquals("sample_00", fileList.get(0).get(0));
assertEquals("sample_01", fileList.get(1).get(0));
assertEquals("sample_02", fileList.get(2).get(0));
}
{
FileList fileList = newFileList(config.deepCopy().set("min_task_size", 200),
"sample_00", 100L,
"sample_01", 150L,
"sample_02", 350L);
assertEquals(2, fileList.getTaskCount());
assertEquals("sample_00", fileList.get(0).get(0));
assertEquals("sample_01", fileList.get(0).get(1));
assertEquals("sample_02", fileList.get(1).get(0));
}
{
FileList fileList = newFileList(config.deepCopy().set("min_task_size", 700),
"sample_00", 100L,
"sample_01", 150L,
"sample_02", 350L);
assertEquals(1, fileList.getTaskCount());
assertEquals("sample_00", fileList.get(0).get(0));
assertEquals("sample_01", fileList.get(0).get(1));
assertEquals("sample_02", fileList.get(0).get(2));
}
}
private static FileList newFileList(ConfigSource config, Object... nameAndSize)
{
FileList.Builder builder = new FileList.Builder(config);
for (int i = 0; i < nameAndSize.length; i += 2) {
builder.add((String) nameAndSize[i], (long) nameAndSize[i + 1]);
}
return builder.build();
}
}
package org.embulk.input.s3;
import com.google.common.base.Optional;
import org.embulk.EmbulkTestRuntime;
import org.embulk.config.ConfigSource;
import org.embulk.input.s3.S3FileInputPlugin.S3PluginTask;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestHttpProxy
{
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
private ConfigSource config;
@Before
public void createResources()
{
config = runtime.getExec().newConfigSource();
setupS3Config(config);
}
@Test
public void checkDefaultHttpProxy()
{
ConfigSource conf = config.deepCopy();
setupS3Config(conf);
S3PluginTask task = conf.loadConfig(S3PluginTask.class);
assertTrue(!task.getHttpProxy().isPresent());
}
@Test
public void checkHttpProxy()
{
{ // specify host
String host = "my_host";
ConfigSource conf = config.deepCopy().set("host", host);
HttpProxy httpProxy = conf.loadConfig(HttpProxy.class);
assertHttpProxy(host, Optional.<Integer>absent(), true, Optional.<String>absent(), Optional.<String>absent(),
httpProxy);
}
{ // specify https=true explicitly
String host = "my_host";
ConfigSource conf = config.deepCopy()
.set("host", host)
.set("https", true);
HttpProxy httpProxy = conf.loadConfig(HttpProxy.class);
assertHttpProxy(host, Optional.<Integer>absent(), true, Optional.<String>absent(), Optional.<String>absent(),
httpProxy);
}
{ // specify https=false
String host = "my_host";
ConfigSource conf = config.deepCopy()
.set("host", host)
.set("https", false);
HttpProxy httpProxy = conf.loadConfig(HttpProxy.class);
assertHttpProxy(host, Optional.<Integer>absent(), false, Optional.<String>absent(), Optional.<String>absent(),
httpProxy);
}
{ // specify host, port
String host = "my_host";
int port = 8080;
ConfigSource conf = config.deepCopy()
.set("host", host)
.set("port", port);
HttpProxy httpProxy = conf.loadConfig(HttpProxy.class);
assertHttpProxy(host, Optional.of(port), true, Optional.<String>absent(), Optional.<String>absent(),
httpProxy);
}
{ // specify host, port, user, password
String host = "my_host";
int port = 8080;
String user = "my_user";
String password = "my_pass";
ConfigSource conf = config.deepCopy()
.set("host", host)
.set("port", port)
.set("user", user)
.set("password", password);
HttpProxy httpProxy = conf.loadConfig(HttpProxy.class);
assertHttpProxy(host, Optional.of(port), true, Optional.of(user), Optional.of(password),
httpProxy);
}
}
private static void setupS3Config(ConfigSource config)
{
config.set("bucket", "my_bucket").set("path_prefix", "my_path_prefix");
}
private static void assertHttpProxy(String host, Optional<Integer> port, boolean https, Optional<String> user, Optional<String> password,
HttpProxy actual)
{
assertEquals(host, actual.getHost());
assertEquals(port.isPresent(), actual.getPort().isPresent());
if (port.isPresent()) {
assertEquals(port.get(), actual.getPort().get());
}
assertEquals(https, actual.getHttps());
assertEquals(user.isPresent(), actual.getUser().isPresent());
if (user.isPresent()) {
assertEquals(user.get(), actual.getUser().get());
}
assertEquals(password.isPresent(), actual.getPassword().isPresent());
if (password.isPresent()) {
assertEquals(password.get(), actual.getPassword().get());
}
}
}
package org.embulk.input.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.Region;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.embulk.EmbulkTestRuntime;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.FileInputRunner;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.TestPageBuilderReader.MockPageOutput;
import org.embulk.spi.util.Pages;
import org.embulk.standards.CsvParserPlugin;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import static org.embulk.input.s3.S3FileInputPlugin.S3PluginTask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assume.assumeNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
public class TestS3FileInputPlugin
{
private static String EMBULK_S3_TEST_BUCKET;
private static String EMBULK_S3_TEST_ACCESS_KEY_ID;
private static String EMBULK_S3_TEST_SECRET_ACCESS_KEY;
private static final String EMBULK_S3_TEST_PATH_PREFIX = "embulk_input_s3_test";
/*
* This test case requires environment variables:
* EMBULK_S3_TEST_BUCKET
* EMBULK_S3_TEST_ACCESS_KEY_ID
* EMBULK_S3_TEST_SECRET_ACCESS_KEY
* If the variables not set, the test case is skipped.
*/
@BeforeClass
public static void initializeConstantVariables()
{
EMBULK_S3_TEST_BUCKET = System.getenv("EMBULK_S3_TEST_BUCKET");
EMBULK_S3_TEST_ACCESS_KEY_ID = System.getenv("EMBULK_S3_TEST_ACCESS_KEY_ID");
EMBULK_S3_TEST_SECRET_ACCESS_KEY = System.getenv("EMBULK_S3_TEST_SECRET_ACCESS_KEY");
assumeNotNull(EMBULK_S3_TEST_BUCKET, EMBULK_S3_TEST_ACCESS_KEY_ID, EMBULK_S3_TEST_SECRET_ACCESS_KEY);
}
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
private ConfigSource config;
private FileInputRunner runner;
private MockPageOutput output;
@Before
public void createResources()
{
config = runtime.getExec().newConfigSource()
.set("type", "s3")
.set("bucket", EMBULK_S3_TEST_BUCKET)
.set("access_key_id", EMBULK_S3_TEST_ACCESS_KEY_ID)
.set("secret_access_key", EMBULK_S3_TEST_SECRET_ACCESS_KEY)
.set("path_prefix", EMBULK_S3_TEST_PATH_PREFIX)
.set("parser", parserConfig(schemaConfig()));
runner = new FileInputRunner(runtime.getInstance(S3FileInputPlugin.class));
output = new MockPageOutput();
}
@Test
public void simpleTest()
{
ConfigSource config = this.config.deepCopy();
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path"));
assertRecords(config, output);
}
@Test
public void useLastPath()
throws Exception
{
ConfigSource config = this.config.deepCopy().set("last_path", EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv");
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path"));
assertEquals(0, getRecords(config, output).size());
}
@Test
public void useIncremental()
{
ConfigSource config = this.config.deepCopy().set("incremental", false);
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
assertFalse(configDiff.has("last_path"));
}
@Test
public void emptyFilesWithLastPath()
throws Exception
{
ConfigSource config = this.config.deepCopy()
.set("path_prefix", "empty_files_prefix")
.set("last_path", EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv");
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path")); // keep the last_path
assertEquals(0, getRecords(config, output).size());
}
@Test
public void useTotalFileCountLimit()
throws Exception
{
ConfigSource config = this.config.deepCopy().set("total_file_count_limit", 0);
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
assertNull(configDiff.get(String.class, "last_path"));
assertEquals(0, getRecords(config, output).size());
}
@Test
public void usePathMatchPattern()
throws Exception
{
{ // match pattern
ConfigSource config = this.config.deepCopy().set("path_match_pattern", "/sample_01");
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
assertEquals(EMBULK_S3_TEST_PATH_PREFIX + "/sample_01.csv", configDiff.get(String.class, "last_path"));
assertRecords(config, output);
}
output = new MockPageOutput();
{ // not match pattern
ConfigSource config = this.config.deepCopy().set("path_match_pattern", "/match/");
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
assertNull(configDiff.get(String.class, "last_path"));
assertEquals(0, getRecords(config, output).size());
}
}
@Test
public void usePath()
{
ConfigSource config = this.config.deepCopy()
.set("path", String.format("%s/sample_01.csv", EMBULK_S3_TEST_PATH_PREFIX))
.set("path_prefix", null);
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
assertEquals(String.format("%s/sample_01.csv", EMBULK_S3_TEST_PATH_PREFIX), configDiff.get(String.class, "last_path"));
assertRecords(config, output);
}
@Test
public void usePathAsHighPriorityThanPathPrefix()
{
ConfigSource config = this.config.deepCopy()
.set("path", String.format("%s/sample_01.csv", EMBULK_S3_TEST_PATH_PREFIX))
.set("path_prefix", "foo"); // path_prefix has the bad value, if path_prefix is chosen, expected result will be failed
ConfigDiff configDiff = runner.transaction(config, new Control(runner, output));
assertEquals(String.format("%s/sample_01.csv", EMBULK_S3_TEST_PATH_PREFIX), configDiff.get(String.class, "last_path"));
assertRecords(config, output);
}
@Test
public void configuredEndpoint()
{
S3PluginTask task = config.deepCopy()
.set("endpoint", "s3-ap-southeast-1.amazonaws.com")
.set("region", "ap-southeast-2")
.loadConfig(S3PluginTask.class);
S3FileInputPlugin plugin = runtime.getInstance(S3FileInputPlugin.class);
AmazonS3 s3Client = plugin.newS3Client(task);
// Should not crash and favor the endpoint over the region configuration (there's a warning log though)
assertEquals(s3Client.getRegion(), Region.AP_Singapore);
}
@Test
public void configuredRegion()
{
S3PluginTask task = config.deepCopy()
.set("region", "ap-southeast-2")
.remove("endpoint")
.loadConfig(S3PluginTask.class);
S3FileInputPlugin plugin = runtime.getInstance(S3FileInputPlugin.class);
AmazonS3 s3Client = plugin.newS3Client(task);
// Should reflect the region configuration as is
assertEquals(s3Client.getRegion(), Region.AP_Sydney);
}
@Test
public void unconfiguredEndpointAndRegion()
{
S3PluginTask task = config.deepCopy()
.remove("endpoint")
.remove("region")
.loadConfig(S3PluginTask.class);
S3FileInputPlugin plugin = runtime.getInstance(S3FileInputPlugin.class);
AmazonS3 s3Client = plugin.newS3Client(task);
// US Standard region is a 'generic' one (s3.amazonaws.com), the expectation here that
// the S3 client should not eagerly resolves for a specific region on client side.
// Please refer to org.embulk.input.s3.S3FileInputPlugin#newS3Client for the details.
assertEquals(s3Client.getRegion(), Region.US_Standard);
}
@Test(expected = ConfigException.class)
public void useSkipGlacierObjects() throws Exception
{
AmazonS3 client;
client = mock(AmazonS3.class);
doReturn(s3objectList("in/aa/a", StorageClass.Glacier)).when(client).listObjects(any(ListObjectsRequest.class));
AbstractS3FileInputPlugin plugin = Mockito.mock(AbstractS3FileInputPlugin.class, Mockito.CALLS_REAL_METHODS);
plugin.listS3FilesByPrefix(newFileList(config, "sample_00", 100L), client, "test_bucket", "test_prefix", Optional.<String>absent(), false);
}
private FileList.Builder newFileList(ConfigSource config, Object... nameAndSize)
{
FileList.Builder builder = new FileList.Builder(config);
for (int i = 0; i < nameAndSize.length; i += 2) {
builder.add((String) nameAndSize[i], (long) nameAndSize[i + 1]);
}
return builder;
}
private ObjectListing s3objectList(String key, StorageClass storageClass) throws Exception
{
ObjectListing list = new ObjectListing();
S3ObjectSummary element = new S3ObjectSummary();
element.setKey(key);
element.setStorageClass(storageClass.toString());
List<S3ObjectSummary> objectSummaries = new ArrayList<>();
objectSummaries.add(element);
Field field = list.getClass().getDeclaredField("objectSummaries");
field.setAccessible(true);
field.set(list, objectSummaries);
return list;
}
static class Control
implements InputPlugin.Control
{
private FileInputRunner runner;
private PageOutput output;
Control(FileInputRunner runner, PageOutput output)
{
this.runner = runner;
this.output = output;
}
@Override
public List<TaskReport> run(TaskSource taskSource, Schema schema, int taskCount)
{
List<TaskReport> reports = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
reports.add(runner.run(taskSource, schema, i, output));
}
return reports;
}
}
static ImmutableMap<String, Object> parserConfig(ImmutableList<Object> schemaConfig)
{
ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
builder.put("type", "csv");
builder.put("newline", "CRLF");
builder.put("delimiter", ",");
builder.put("quote", "\"");
builder.put("escape", "\"");
builder.put("trim_if_not_quoted", false);
builder.put("skip_header_lines", 0);
builder.put("allow_extra_columns", false);
builder.put("allow_optional_columns", false);
builder.put("columns", schemaConfig);
return builder.build();
}
static ImmutableList<Object> schemaConfig()
{
ImmutableList.Builder<Object> builder = new ImmutableList.Builder<>();
builder.add(ImmutableMap.of("name", "timestamp", "type", "timestamp", "format", "%Y-%m-%d %H:%M:%S"));
builder.add(ImmutableMap.of("name", "host", "type", "string"));
builder.add(ImmutableMap.of("name", "path", "type", "string"));
builder.add(ImmutableMap.of("name", "method", "type", "string"));
builder.add(ImmutableMap.of("name", "referer", "type", "string"));
builder.add(ImmutableMap.of("name", "code", "type", "long"));
builder.add(ImmutableMap.of("name", "agent", "type", "string"));
builder.add(ImmutableMap.of("name", "user", "type", "string"));
builder.add(ImmutableMap.of("name", "size", "type", "long"));
return builder.build();
}
static void assertRecords(ConfigSource config, MockPageOutput output)
{
List<Object[]> records = getRecords(config, output);
assertEquals(2, records.size());
{
Object[] record = records.get(0);
assertEquals("2014-10-02 22:15:39 UTC", record[0].toString());
assertEquals("84.186.29.187", record[1]);
assertEquals("/category/electronics", record[2]);
assertEquals("GET", record[3]);
assertEquals("/category/music", record[4]);
assertEquals(200L, record[5]);
assertEquals("Mozilla/5.0", record[6]);
assertEquals("-", record[7]);
assertEquals(136L, record[8]);
}
{
Object[] record = records.get(1);
assertEquals("2014-10-02 22:15:01 UTC", record[0].toString());
assertEquals("140.36.216.47", record[1]);
assertEquals("/category/music?from=10", record[2]);
assertEquals("GET", record[3]);
assertEquals("-", record[4]);
assertEquals(200L, record[5]);
assertEquals("Mozilla/5.0", record[6]);
assertEquals("-", record[7]);
assertEquals(70L, record[8]);
}
}
static List<Object[]> getRecords(ConfigSource config, MockPageOutput output)
{
Schema schema = config.getNested("parser").loadConfig(CsvParserPlugin.PluginTask.class).getSchemaConfig().toSchema();
return Pages.toObjects(schema, output.pages);
}
}
package org.embulk.input.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.embulk.EmbulkTestRuntime;
import org.embulk.input.s3.AbstractS3FileInputPlugin.S3InputStreamReopener;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
public class TestS3InputStreamReopener
{
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();
private AmazonS3 client;
@Before
public void createResources()
{
client = mock(AmazonS3.class);
}
@Test
public void reopenS3FileByReopener()
throws Exception
{
String content = "value";
{ // not retry
doReturn(s3object("in/aa/a", content)).when(client).getObject(any(GetObjectRequest.class));
S3InputStreamReopener opener = new S3InputStreamReopener(client, new GetObjectRequest("my_bucket", "in/aa/a"), content.length());
try (InputStream in = opener.reopen(0, new RuntimeException())) {
BufferedReader r = new BufferedReader(new InputStreamReader(in));
assertEquals("value", r.readLine());
}
}
{ // retry once
doThrow(new RuntimeException()).doReturn(s3object("in/aa/a", content)).when(client).getObject(any(GetObjectRequest.class));
S3InputStreamReopener opener = new S3InputStreamReopener(
client,
new GetObjectRequest("my_bucket", "in/aa/a"),
content.length(),
retryExecutor()
.withInitialRetryWait(0)
.withRetryLimit(1));
try (InputStream in = opener.reopen(0, new RuntimeException())) {
BufferedReader r = new BufferedReader(new InputStreamReader(in));
assertEquals("value", r.readLine());
}
}
}
@Test(expected = AmazonClientException.class)
public void reopenS3FileByReopener_on_retry_gave_up_should_throw_original_exception() throws Exception
{
String content = "value";
doThrow(new AmazonClientException("no")).doReturn(s3object("in/aa/a", content)).when(client).getObject(any(GetObjectRequest.class));
S3InputStreamReopener opener = new S3InputStreamReopener(
client,
new GetObjectRequest("my_bucket", "in/aa/a"),
content.length(),
retryExecutor()
.withInitialRetryWait(0)
.withRetryLimit(0));
opener.reopen(0, new RuntimeException());
}
@Test(expected = AmazonClientException.class)
public void reopenS3FileByReopener_on_retry_always_throw_exception()
throws Exception
{
// always failed call with 2 retries
doThrow(new AmazonClientException("This exception is thrown when retrying.")).when(client).getObject(any(GetObjectRequest.class));
S3InputStreamReopener opener = new S3InputStreamReopener(
client,
new GetObjectRequest("my_bucket", "in/aa/a"),
"value".length(),
retryExecutor()
.withInitialRetryWait(0)
.withRetryLimit(2));
try (InputStream in = opener.reopen(0, new AmazonClientException("This exception can be ignored"))) {
fail("Should throw exception.");
}
}
static S3Object s3object(String key, String value)
{
S3Object o = new S3Object();
o.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(value.getBytes()), null));
ObjectMetadata om = new ObjectMetadata();
om.setContentLength(value.length());
o.setObjectMetadata(om);
return o;
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment