19 #include <aws/core/Aws.h>
20 #include <aws/core/auth/AWSCredentialsProvider.h>
21 #include <aws/core/auth/AWSCredentialsProviderChain.h>
22 #include <aws/s3/model/GetObjectRequest.h>
23 #include <aws/s3/model/ListObjectsV2Request.h>
24 #include <aws/s3/model/Object.h>
26 #include <boost/filesystem.hpp>
38 const std::string& env_variable_name) {
40 if (param.empty() && (0 != (env = getenv(env_variable_name.c_str())))) {
49 if (!boost::filesystem::is_directory(
s3_temp_dir)) {
50 throw std::runtime_error(
"failed to create s3_temp_dir directory '" +
s3_temp_dir +
64 Aws::S3::Model::ListObjectsV2Request objects_request;
67 objects_request.SetMaxKeys(1 << 20);
74 throw std::runtime_error(
75 "Required parameter \"s3_region\" not set. Please specify the \"s3_region\" "
76 "configuration parameter.");
84 Aws::Client::ClientConfiguration s3_config;
88 s3_config.caPath = ssl_config.ca_path;
89 s3_config.caFile = ssl_config.ca_file;
92 s3_client.reset(
new Aws::S3::S3Client(
96 s3_client.reset(
new Aws::S3::S3Client(
97 std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(), s3_config));
99 s3_client.reset(
new Aws::S3::S3Client(
100 std::make_shared<Aws::Auth::AnonymousAWSCredentialsProvider>(), s3_config));
103 auto list_objects_outcome = s3_client->ListObjectsV2(objects_request);
104 if (list_objects_outcome.IsSuccess()) {
107 auto object_list = list_objects_outcome.GetResult().GetContents();
111 if (0 == object_list.size()) {
113 throw std::runtime_error(
"no object was found with s3 url '" +
url +
"'");
117 LOG(
INFO) <<
"Found " << (
objkeys.empty() ?
"" :
"another ") << object_list.size()
118 <<
" objects with url '" +
url +
"':";
119 for (
auto const& obj : object_list) {
120 std::string objkey = obj.GetKey().c_str();
121 LOG(
INFO) <<
"\t" << objkey <<
" (size = " << obj.GetSize() <<
" bytes)";
124 boost::filesystem::path path{objkey};
125 if (0 == obj.GetSize()) {
128 if (
'/' == objkey.back()) {
131 if (
'.' == path.filename().string().front()) {
146 throw std::runtime_error(
"failed to list objects of s3 url '" +
url +
"': " +
147 list_objects_outcome.GetError().GetExceptionName() +
148 ": " + list_objects_outcome.GetError().GetMessage());
153 if (list_objects_outcome.GetResult().GetIsTruncated()) {
154 objects_request.SetContinuationToken(
155 list_objects_outcome.GetResult().GetNextContinuationToken());
174 std::exception_ptr& teptr,
175 const bool for_detection,
176 const bool allow_named_pipe_use,
177 const bool track_file_paths) {
179 static std::atomic<int64_t> seqno(((int64_t)getpid() << 32) | time(0));
182 boost::filesystem::remove(file_path);
184 auto ext = strrchr(objkey.c_str(),
'.');
185 auto use_pipe = (
nullptr == ext || 0 != strcmp(ext,
".7z"));
186 #ifdef ENABLE_IMPORT_PARQUET
187 use_pipe = use_pipe && (
nullptr == ext || 0 != strcmp(ext,
".parquet"));
189 if (!allow_named_pipe_use) {
193 if (mkfifo(file_path.c_str(), 0660) < 0) {
194 throw std::runtime_error(
"failed to create named pipe '" + file_path +
195 "': " + strerror(errno));
222 Aws::S3::Model::GetObjectRequest object_request;
223 object_request.WithBucket(
bucket_name).WithKey(objkey);
226 if (use_pipe && for_detection) {
227 object_request.SetRange(
"bytes=0-10000000");
230 auto get_object_outcome = s3_client->GetObject(object_request);
231 if (!get_object_outcome.IsSuccess()) {
232 throw std::runtime_error(
"failed to get object '" + objkey +
"' of s3 url '" +
url +
233 "': " + get_object_outcome.GetError().GetExceptionName() +
234 ": " + get_object_outcome.GetError().GetMessage());
238 std::atomic<bool> is_get_object_outcome_moved(
false);
242 std::thread([=, &teptr, &get_object_outcome, &is_get_object_outcome_moved]() {
246 static std::mutex mutex_glog;
247 #define S3_LOG_WITH_LOCK(x) \
249 std::lock_guard<std::mutex> lock(mutex_glog); \
253 << objkey <<
" to " << (use_pipe ?
"pipe " :
"file ")
254 << file_path <<
"...")
255 auto get_object_outcome_moved =
256 decltype(get_object_outcome)(std::move(get_object_outcome));
257 is_get_object_outcome_moved =
true;
258 Aws::OFStream local_file;
259 local_file.
open(file_path.c_str(),
260 std::ios::out | std::ios::binary | std::ios::trunc);
261 local_file << get_object_outcome_moved.GetResult().GetBody().rdbuf();
264 << (use_pipe ? "pipe " : "file ") << file_path << ".")
269 teptr = std::current_exception();
279 while (!is_get_object_outcome_moved) {
280 std::this_thread::yield();
287 threads.push_back(std::move(th_writer));
298 if (track_file_paths) {
300 file_paths.insert(std::pair<const std::string, const std::string>(objkey, file_path));
310 boost::filesystem::remove(it->second);
const std::string land(const std::string &objkey, std::exception_ptr &teptr, const bool for_detection)
SslConfig get_ssl_config()
std::string s3_access_key
std::vector< Aws::S3::Model::Object > s3_objects_filter_sort_files(const std::vector< Aws::S3::Model::Object > &file_paths, const shared::FilePathOptions &options)
std::map< const std::string, const std::string > file_paths
std::optional< std::string > file_sort_regex
bool g_enable_smem_group_by true
std::optional< std::string > regex_path_filter
void init_for_read() override
std::optional< std::string > file_sort_order_by
std::string s3_session_token
const std::string url_part(const int i)
void get_s3_parameter_from_env_if_unset_or_empty(std::string ¶m, const std::string &env_variable_name)
std::string s3_secret_key
bool g_allow_s3_server_privileges
std::vector< std::string > objkeys
void vacuum(const std::string &objkey)
#define S3_LOG_WITH_LOCK(x)