OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CsvShared.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2021 OmniSci, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "CsvShared.h"
17 #include "CsvDataWrapper.h"
19 #include "FsiJsonUtils.h"
21 #include "Utils/DdlUtils.h"
22 
23 namespace foreign_storage {
24 
25 // Serialization functions for FileRegion
26 void set_value(rapidjson::Value& json_val,
27  const FileRegion& file_region,
28  rapidjson::Document::AllocatorType& allocator) {
29  json_val.SetObject();
31  json_val, file_region.first_row_file_offset, "first_row_file_offset", allocator);
33  json_val, file_region.first_row_index, "first_row_index", allocator);
35  json_val, file_region.region_size, "region_size", allocator);
37  json_val, file_region.row_count, "row_count", allocator);
38  if (file_region.filename.size()) {
40  json_val, file_region.filename, "filename", allocator);
41  }
42 }
43 
44 void get_value(const rapidjson::Value& json_val, FileRegion& file_region) {
45  CHECK(json_val.IsObject());
47  json_val, file_region.first_row_file_offset, "first_row_file_offset");
49  json_val, file_region.first_row_index, "first_row_index");
50  json_utils::get_value_from_object(json_val, file_region.region_size, "region_size");
51  json_utils::get_value_from_object(json_val, file_region.row_count, "row_count");
52  if (json_val.HasMember("filename")) {
53  json_utils::get_value_from_object(json_val, file_region.filename, "filename");
54  }
55 }
56 
57 namespace Csv {
58 namespace {
59 std::string validate_and_get_delimiter(const ForeignTable* foreign_table,
60  const std::string& option_name) {
61  if (auto it = foreign_table->options.find(option_name);
62  it != foreign_table->options.end()) {
63  if (it->second.length() == 1) {
64  return it->second;
65  } else {
66  if (it->second == std::string("\\n")) {
67  return "\n";
68  } else if (it->second == std::string("\\t")) {
69  return "\t";
70  } else {
71  throw std::runtime_error{"Invalid value specified for option \"" + option_name +
72  "\". Expected a single character, \"\\n\" or \"\\t\"."};
73  }
74  }
75  }
76  return "";
77 }
78 
79 std::string validate_and_get_string_with_length(const ForeignTable* foreign_table,
80  const std::string& option_name,
81  const size_t expected_num_chars) {
82  if (auto it = foreign_table->options.find(option_name);
83  it != foreign_table->options.end()) {
84  if (it->second.length() != expected_num_chars) {
85  throw std::runtime_error{"Value of \"" + option_name +
86  "\" foreign table option has the wrong number of "
87  "characters. Expected " +
88  std::to_string(expected_num_chars) + " character(s)."};
89  }
90  return it->second;
91  }
92  return "";
93 }
94 
95 std::optional<bool> validate_and_get_bool_value(const ForeignTable* foreign_table,
96  const std::string& option_name) {
97  if (auto it = foreign_table->options.find(option_name);
98  it != foreign_table->options.end()) {
99  if (boost::iequals(it->second, "TRUE")) {
100  return true;
101  } else if (boost::iequals(it->second, "FALSE")) {
102  return false;
103  } else {
104  throw std::runtime_error{"Invalid boolean value specified for \"" + option_name +
105  "\" foreign table option. "
106  "Value must be either 'true' or 'false'."};
107  }
108  }
109  return std::nullopt;
110 }
111 } // namespace
112 
113 bool validate_and_get_is_s3_select(const ForeignTable* foreign_table) {
114  static constexpr const char* S3_DIRECT = "S3_DIRECT";
115  static constexpr const char* S3_SELECT = "S3_SELECT";
116  static constexpr const char* S3_ACCESS_TYPE = "S3_ACCESS_TYPE";
117  auto access_type = foreign_table->options.find(S3_ACCESS_TYPE);
118 
119  if (access_type != foreign_table->options.end()) {
120  auto& server_options = foreign_table->foreign_server->options;
121  if (server_options.find(AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY)->second !=
123  throw std::runtime_error{
124  "The \"" + std::string{S3_ACCESS_TYPE} +
125  "\" option is only valid for foreign tables using servers with \"" +
126  AbstractFileStorageDataWrapper::STORAGE_TYPE_KEY + "\" option value of \"" +
128  }
129  if (access_type->second != S3_DIRECT && access_type->second != S3_SELECT) {
130  throw std::runtime_error{
131  "Invalid value provided for the \"" + std::string{S3_ACCESS_TYPE} +
132  "\" option. Value must be one of the following: " + S3_DIRECT + ", " +
133  S3_SELECT + "."};
134  }
135  return (access_type->second == S3_SELECT);
136  } else {
137  return false;
138  }
139 }
140 
141 void validate_options(const ForeignTable* foreign_table) {
142  validate_and_get_copy_params(foreign_table);
143  validate_and_get_is_s3_select(foreign_table);
144 }
145 
147  const ForeignTable* foreign_table) {
148  import_export::CopyParams copy_params{};
149  copy_params.plain_text = true;
150  if (const auto& value =
151  validate_and_get_string_with_length(foreign_table, "ARRAY_DELIMITER", 1);
152  !value.empty()) {
153  copy_params.array_delim = value[0];
154  }
155  if (const auto& value =
156  validate_and_get_string_with_length(foreign_table, "ARRAY_MARKER", 2);
157  !value.empty()) {
158  copy_params.array_begin = value[0];
159  copy_params.array_end = value[1];
160  }
161  if (auto it = foreign_table->options.find("BUFFER_SIZE");
162  it != foreign_table->options.end()) {
163  copy_params.buffer_size = std::stoi(it->second);
164  }
165  if (const auto& value = validate_and_get_delimiter(foreign_table, "DELIMITER");
166  !value.empty()) {
167  copy_params.delimiter = value[0];
168  }
169  if (const auto& value = validate_and_get_string_with_length(foreign_table, "ESCAPE", 1);
170  !value.empty()) {
171  copy_params.escape = value[0];
172  }
173  auto has_header = validate_and_get_bool_value(foreign_table, "HEADER");
174  if (has_header.has_value()) {
175  if (has_header.value()) {
176  copy_params.has_header = import_export::ImportHeaderRow::HAS_HEADER;
177  } else {
178  copy_params.has_header = import_export::ImportHeaderRow::NO_HEADER;
179  }
180  }
181  if (const auto& value = validate_and_get_delimiter(foreign_table, "LINE_DELIMITER");
182  !value.empty()) {
183  copy_params.line_delim = value[0];
184  }
185  copy_params.lonlat =
186  validate_and_get_bool_value(foreign_table, "LONLAT").value_or(copy_params.lonlat);
187 
188  if (auto it = foreign_table->options.find("NULLS");
189  it != foreign_table->options.end()) {
190  copy_params.null_str = it->second;
191  }
192  if (const auto& value = validate_and_get_string_with_length(foreign_table, "QUOTE", 1);
193  !value.empty()) {
194  copy_params.quote = value[0];
195  }
196  copy_params.quoted =
197  validate_and_get_bool_value(foreign_table, "QUOTED").value_or(copy_params.quoted);
198  return copy_params;
199 }
200 
202  const ChunkKey& chunk_key,
203  const std::map<ChunkKey, std::shared_ptr<ChunkMetadata>>& chunk_metadata_map,
204  const std::map<ChunkKey, AbstractBuffer*>& buffers,
205  Chunk_NS::Chunk& chunk) {
206  auto catalog =
208  CHECK(catalog);
209 
210  ChunkKey data_chunk_key = chunk_key;
211  AbstractBuffer* data_buffer = nullptr;
212  AbstractBuffer* index_buffer = nullptr;
213  const auto column = catalog->getMetadataForColumnUnlocked(
214  chunk_key[CHUNK_KEY_TABLE_IDX], chunk_key[CHUNK_KEY_COLUMN_IDX]);
215 
216  if (column->columnType.is_varlen_indeed()) {
217  data_chunk_key.push_back(1);
218  ChunkKey index_chunk_key = chunk_key;
219  index_chunk_key.push_back(2);
220 
221  CHECK(buffers.find(data_chunk_key) != buffers.end());
222  CHECK(buffers.find(index_chunk_key) != buffers.end());
223 
224  data_buffer = buffers.find(data_chunk_key)->second;
225  index_buffer = buffers.find(index_chunk_key)->second;
226  CHECK_EQ(data_buffer->size(), static_cast<size_t>(0));
227  CHECK_EQ(index_buffer->size(), static_cast<size_t>(0));
228 
229  size_t index_offset_size{0};
230  if (column->columnType.is_string() || column->columnType.is_geometry()) {
231  index_offset_size = sizeof(StringOffsetT);
232  } else if (column->columnType.is_array()) {
233  index_offset_size = sizeof(ArrayOffsetT);
234  } else {
235  UNREACHABLE();
236  }
237  CHECK(chunk_metadata_map.find(data_chunk_key) != chunk_metadata_map.end());
238  index_buffer->reserve(index_offset_size *
239  (chunk_metadata_map.at(data_chunk_key)->numElements + 1));
240  } else {
241  data_chunk_key = chunk_key;
242  CHECK(buffers.find(data_chunk_key) != buffers.end());
243  data_buffer = buffers.find(data_chunk_key)->second;
244  }
245  CHECK(chunk_metadata_map.find(data_chunk_key) != chunk_metadata_map.end());
246  data_buffer->reserve(chunk_metadata_map.at(data_chunk_key)->numBytes);
247 
248  chunk.setColumnDesc(column);
249  chunk.setBuffer(data_buffer);
250  chunk.setIndexBuffer(index_buffer);
251  chunk.initEncoder();
252 }
253 
254 std::shared_ptr<ChunkMetadata> get_placeholder_metadata(const ColumnDescriptor* column,
255  size_t num_elements) {
256  ForeignStorageBuffer empty_buffer;
257  // Use default encoder metadata as in parquet wrapper
258  empty_buffer.initEncoder(column->columnType);
259  auto chunk_metadata = empty_buffer.getEncoder()->getMetadata(column->columnType);
260  chunk_metadata->numElements = num_elements;
261 
262  if (!column->columnType.is_varlen_indeed()) {
263  chunk_metadata->numBytes = column->columnType.get_size() * num_elements;
264  }
265  // min/max not set by default for arrays, so get from elem type encoder
266  if (column->columnType.is_array()) {
267  ForeignStorageBuffer scalar_buffer;
268  scalar_buffer.initEncoder(column->columnType.get_elem_type());
269  auto scalar_metadata =
270  scalar_buffer.getEncoder()->getMetadata(column->columnType.get_elem_type());
271  chunk_metadata->chunkStats.min = scalar_metadata->chunkStats.min;
272  chunk_metadata->chunkStats.max = scalar_metadata->chunkStats.max;
273  }
274  chunk_metadata->chunkStats.has_nulls = true;
275  return chunk_metadata;
276 }
277 } // namespace Csv
278 
279 } // namespace foreign_storage
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
HOST DEVICE int get_size() const
Definition: sqltypes.h:403
import_export::CopyParams validate_and_get_copy_params(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:146
void setIndexBuffer(AbstractBuffer *ib)
Definition: Chunk.h:152
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
void get_value_from_object(const rapidjson::Value &object, T &value, const std::string &name)
Definition: JsonUtils.h:270
#define UNREACHABLE()
Definition: Logger.h:338
void get_value(const rapidjson::Value &json_val, FileRegion &file_region)
Definition: CsvShared.cpp:44
void initEncoder(const SQLTypeInfo &tmp_sql_type)
std::string to_string(char const *&&v)
void setBuffer(AbstractBuffer *b)
Definition: Chunk.h:150
void validate_options(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:141
int32_t StringOffsetT
Definition: sqltypes.h:1495
virtual void getMetadata(const std::shared_ptr< ChunkMetadata > &chunkMetadata)
Definition: Encoder.cpp:231
std::shared_ptr< ChunkMetadata > get_placeholder_metadata(const ColumnDescriptor *column, size_t num_elements)
Definition: CsvShared.cpp:254
std::string validate_and_get_string_with_length(const ForeignTable *foreign_table, const std::string &option_name, const size_t expected_num_chars)
static SysCatalog & instance()
Definition: SysCatalog.h:343
std::optional< bool > validate_and_get_bool_value(const ForeignTable *foreign_table, const std::string &option_name)
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
An AbstractBuffer is a unit of data management for a data manager.
specifies the content in-memory of a row in the column metadata table
std::string validate_and_get_delimiter(const ForeignTable *foreign_table, const std::string &option_name)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
int32_t ArrayOffsetT
Definition: sqltypes.h:1496
void add_value_to_object(rapidjson::Value &object, const T &value, const std::string &name, rapidjson::Document::AllocatorType &allocator)
Definition: JsonUtils.h:255
void init_chunk_for_column(const ChunkKey &chunk_key, const std::map< ChunkKey, std::shared_ptr< ChunkMetadata >> &chunk_metadata_map, const std::map< ChunkKey, AbstractBuffer * > &buffers, Chunk_NS::Chunk &chunk)
Definition: CsvShared.cpp:201
void initEncoder()
Definition: Chunk.cpp:290
const ForeignServer * foreign_server
Definition: ForeignTable.h:57
#define CHECK(condition)
Definition: Logger.h:291
void setColumnDesc(const ColumnDescriptor *cd)
Definition: Chunk.h:67
SQLTypeInfo columnType
bool is_varlen_indeed() const
Definition: sqltypes.h:637
#define CHUNK_KEY_COLUMN_IDX
Definition: types.h:40
SQLTypeInfo get_elem_type() const
Definition: sqltypes.h:977
virtual void reserve(size_t num_bytes)=0
bool validate_and_get_is_s3_select(const ForeignTable *foreign_table)
Definition: CsvShared.cpp:113
void set_value(rapidjson::Value &json_val, const FileRegion &file_region, rapidjson::Document::AllocatorType &allocator)
Definition: CsvShared.cpp:26
bool is_array() const
Definition: sqltypes.h:585