OmniSciDB  a5dc49c757
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
LockMgr.cpp
Go to the documentation of this file.
1 /*
2  * Copyright 2022 HEAVY.AI, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "LockMgr/LockMgr.h"
18 #include "Catalog/Catalog.h"
19 #include "LockMgr/LockMgrImpl.h"
20 
21 namespace lockmgr {
22 
23 namespace helpers {
24 
26  const std::string& tableName) {
27  const auto table_id = cat.getTableId(tableName);
28  if (table_id.has_value()) {
29  ChunkKey chunk_key{cat.getCurrentDB().dbId, table_id.value()};
30  return chunk_key;
31  } else {
33  }
34 }
35 
36 } // namespace helpers
37 
39  ref_count_.fetch_add(1u);
40  if (!g_multi_instance) {
41  mutex_.lock();
42  } else {
43  dmutex_->lock();
44  }
45 }
46 
48  bool gotlock{false};
49  if (!g_multi_instance) {
50  gotlock = mutex_.try_lock();
51  } else {
52  gotlock = dmutex_->try_lock();
53  }
54  if (gotlock) {
55  ref_count_.fetch_add(1u);
56  }
57  return gotlock;
58 }
59 
61  if (!g_multi_instance) {
62  mutex_.unlock();
63  } else {
64  dmutex_->unlock();
65  }
66  ref_count_.fetch_sub(1u);
67 }
68 
70  ref_count_.fetch_add(1u);
71  if (!g_multi_instance) {
72  mutex_.lock_shared();
73  } else {
74  dmutex_->lock_shared();
75  }
76 }
77 
79  bool gotlock{false};
80  if (!g_multi_instance) {
81  gotlock = mutex_.try_lock_shared();
82  } else {
83  gotlock = dmutex_->try_lock_shared();
84  }
85  if (gotlock) {
86  ref_count_.fetch_add(1u);
87  }
88  return gotlock;
89 }
90 
92  if (!g_multi_instance) {
93  mutex_.unlock_shared();
94  } else {
95  dmutex_->unlock_shared();
96  }
97  ref_count_.fetch_sub(1u);
98 }
99 
100 template <class T>
102  static T mgr;
103  return mgr;
104 }
105 
106 template <class T>
108  std::lock_guard<std::mutex> access_map_lock(map_mutex_);
109  auto mutex_it = table_mutex_map_.find(table_key);
110  if (mutex_it != table_mutex_map_.end()) {
111  return mutex_it->second.get();
112  }
113 
114  // NOTE(sy): Only used by --multi-instance clusters.
115  std::unique_ptr<heavyai::DistributedSharedMutex> dmutex =
116  getClusterTableMutex(table_key);
117 
118  return table_mutex_map_
119  .emplace(table_key, std::make_unique<MutexTracker>(std::move(dmutex)))
120  .first->second.get();
121 }
122 
123 template <class T>
124 std::set<ChunkKey> TableLockMgrImpl<T>::getLockedTables() const {
125  std::set<ChunkKey> ret;
126  std::lock_guard<std::mutex> access_map_lock(map_mutex_);
127  for (const auto& kv : table_mutex_map_) {
128  if (kv.second->isAcquired()) {
129  ret.insert(kv.first);
130  }
131  }
132 
133  return ret;
134 }
135 
136 template <class T>
138  const std::string& table_name) {
139  auto lock = WriteLock(getMutexTracker(cat, table_name));
140  // Ensure table still exists after lock is acquired.
141  validateExistingTable(cat, table_name);
142  return lock;
143 }
144 
145 template <class T>
147  auto& table_lock_mgr = T::instance();
148  return WriteLock(table_lock_mgr.getTableMutex(table_key));
149 }
150 
151 template <class T>
153  const std::string& table_name) {
154  auto lock = ReadLock(getMutexTracker(cat, table_name));
155  // Ensure table still exists after lock is acquired.
156  validateAndGetExistingTableId(cat, table_name);
157  return lock;
158 }
159 
160 template <class T>
162  auto& table_lock_mgr = T::instance();
163  return ReadLock(table_lock_mgr.getTableMutex(table_key));
164 }
165 
166 template <class T>
167 std::unique_ptr<heavyai::DistributedSharedMutex>
169  std::unique_ptr<heavyai::DistributedSharedMutex> table_mutex;
170 
171  std::string table_key_as_text;
172  for (auto n : table_key) {
173  table_key_as_text += (!table_key_as_text.empty() ? "_" : "") + std::to_string(n);
174  }
175 
176  // A callback used for syncing with most of the changed Catalog metadata, in-general,
177  // such as the list of tables that exist, dashboards, etc. This is accomplished by
178  // read locking, and immediately unlocking, dcatalogMutex_, so
179  // cat->reloadCatalogMetadataUnlocked() will be called.
180  auto cb_reload_catalog_metadata = [table_key](bool write) {
181  if constexpr (T::kind == "insert") {
182  CHECK(write); // The insert lock is for writing, never for reading.
183  }
184  auto cat =
186  CHECK(cat);
188  *cat->dcatalogMutex_);
189  };
190 
191  if constexpr (T::kind == "schema") {
192  // A callback used for reloading the Catalog schema for the one table being locked.
193  auto cb_reload_table_metadata = [table_key, table_key_as_text](size_t version) {
194  VLOG(2) << "reloading table metadata for: table_" << table_key_as_text;
195  CHECK_EQ(table_key.size(), 2U);
197  table_key[CHUNK_KEY_DB_IDX]);
198  CHECK(cat);
200  *cat->dcatalogMutex_);
201  cat->reloadTableMetadataUnlocked(table_key[CHUNK_KEY_TABLE_IDX]);
202  };
203 
204  // Create the table mutex.
206  cb_reload_catalog_metadata, // pre_lock_callback
207  cb_reload_table_metadata // reload_cache_callback
208  };
209  auto schema_lockfile{
210  std::filesystem::path(g_base_path) / shared::kLockfilesDirectoryName /
212  ("table_" + table_key_as_text + "." + T::kind.data() + ".lockfile")};
213  table_mutex =
214  std::make_unique<heavyai::DistributedSharedMutex>(schema_lockfile.string(), cbs);
215  } else if constexpr (T::kind == "data" || T::kind == "insert") {
216  // A callback used for reloading the DataMgr data for the one table being locked.
217  auto cb_reload_table_data = [table_key, table_key_as_text](size_t version) {
218  VLOG(2) << "invalidating table caches for new version " << version << " of: table_"
219  << table_key_as_text;
220  CHECK_EQ(table_key.size(), 2U);
222  table_key[CHUNK_KEY_DB_IDX]);
223  CHECK(cat);
224  cat->invalidateCachesForTable(table_key[CHUNK_KEY_TABLE_IDX]);
225  };
226 
227  // Create the rows mutex.
228  auto rows_lockfile{std::filesystem::path(g_base_path) /
230  ("table_" + table_key_as_text + ".rows.lockfile")};
231  std::shared_ptr<heavyai::DistributedSharedMutex> rows_mutex =
232  std::make_shared<heavyai::DistributedSharedMutex>(
233  rows_lockfile.string(),
234  cb_reload_table_data // reload_cache_callback
235  );
236 
237  // A callback used for syncing with outside changes to this table's row data.
238  auto cb_reload_row_data = [table_key, rows_mutex](bool write) {
239  heavyai::shared_lock<heavyai::DistributedSharedMutex> rows_read_lock(*rows_mutex);
240  };
241 
242  // A callback to notify other nodes about our changes to this table's row data.
243  auto cb_notify_about_row_data = [table_key, rows_mutex](bool write) {
244  if (write) {
246  *rows_mutex);
247  }
248  };
249 
250  // Create the table mutex.
252  cb_reload_catalog_metadata, // pre_lock_callback
253  {},
254  cb_reload_row_data, // post_lock_callback
255  cb_notify_about_row_data // pre_unlock_callback
256  };
257  auto table_lockfile{
258  std::filesystem::path(g_base_path) / shared::kLockfilesDirectoryName /
260  ("table_" + table_key_as_text + "." + T::kind.data() + ".lockfile")};
261  table_mutex =
262  std::make_unique<heavyai::DistributedSharedMutex>(table_lockfile.string(), cbs);
263  } else {
264  UNREACHABLE() << "unexpected lockmgr kind: " << T::kind;
265  }
266 
267  return table_mutex;
268 }
269 
270 template <class T>
272  const Catalog_Namespace::Catalog& catalog,
273  const std::string& table_name) {
274  ChunkKey chunk_key{catalog.getDatabaseId(),
275  validateAndGetExistingTableId(catalog, table_name)};
276  auto& table_lock_mgr = T::instance();
277  MutexTracker* tracker = table_lock_mgr.getTableMutex(chunk_key);
278  CHECK(tracker);
279  return tracker;
280 }
281 
282 template <class T>
284  const std::string& table_name) {
285  validateAndGetExistingTableId(catalog, table_name);
286 }
287 
288 template <class T>
290  const Catalog_Namespace::Catalog& catalog,
291  const std::string& table_name) {
292  auto table_id = catalog.getTableId(table_name);
293  if (!table_id.has_value()) {
294  throw Catalog_Namespace::TableNotFoundException(table_name, catalog.name());
295  }
296  return table_id.value();
297 }
298 
302 
303 } // namespace lockmgr
MutexTypeBase mutex_
Definition: LockMgrImpl.h:55
static MutexTracker * getMutexTracker(const Catalog_Namespace::Catalog &catalog, const std::string &table_name)
Definition: LockMgr.cpp:271
#define CHECK_EQ(x, y)
Definition: Logger.h:301
std::vector< int > ChunkKey
Definition: types.h:36
static int32_t validateAndGetExistingTableId(const Catalog_Namespace::Catalog &catalog, const std::string &table_name)
Definition: LockMgr.cpp:289
const std::string kDataDirectoryName
bool g_multi_instance
Definition: heavyai_locks.h:22
virtual std::unique_ptr< heavyai::DistributedSharedMutex > getClusterTableMutex(const ChunkKey &table_key) const
Definition: LockMgr.cpp:168
std::string cat(Ts &&...args)
class for a per-database catalog. also includes metadata for the current database and the current use...
Definition: Catalog.h:143
virtual bool try_lock_shared()
Definition: LockMgr.cpp:78
virtual void lock()
Definition: LockMgr.cpp:38
virtual bool try_lock()
Definition: LockMgr.cpp:47
#define CHUNK_KEY_DB_IDX
Definition: types.h:38
#define UNREACHABLE()
Definition: Logger.h:338
std::optional< int32_t > getTableId(const std::string &table_name) const
Definition: Catalog.cpp:1881
std::set< ChunkKey > getLockedTables() const
Definition: LockMgr.cpp:124
std::string to_string(char const *&&v)
static void validateExistingTable(const Catalog_Namespace::Catalog &catalog, const std::string &table_name)
Definition: LockMgr.cpp:283
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
Definition: File.cpp:143
std::shared_lock< T > shared_lock
std::string name() const
Definition: Catalog.h:348
This file contains the class specification and related data structures for Catalog.
TrackedRefLock< WriteLockBase > WriteLock
Definition: LockMgrImpl.h:88
TrackedRefLock< ReadLockBase > ReadLock
Definition: LockMgrImpl.h:89
static SysCatalog & instance()
Definition: SysCatalog.h:343
const DBMetadata & getCurrentDB() const
Definition: Catalog.h:265
std::string g_base_path
Definition: SysCatalog.cpp:62
string version
Definition: setup.in.py:73
std::unique_lock< T > unique_lock
#define CHUNK_KEY_TABLE_IDX
Definition: types.h:39
int getDatabaseId() const
Definition: Catalog.h:326
ChunkKey chunk_key_for_table(const Catalog_Namespace::Catalog &cat, const std::string &tableName)
Definition: LockMgr.cpp:25
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::unique_ptr< heavyai::DistributedSharedMutex > dmutex_
Definition: LockMgrImpl.h:56
virtual void unlock()
Definition: LockMgr.cpp:60
virtual MutexTracker * getTableMutex(const ChunkKey &table_key)
Definition: LockMgr.cpp:107
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgr.cpp:137
T & instance()
Definition: LockMgr.cpp:101
const std::string kCatalogDirectoryName
virtual void unlock_shared()
Definition: LockMgr.cpp:91
virtual void lock_shared()
Definition: LockMgr.cpp:69
#define CHECK(condition)
Definition: Logger.h:291
const std::string kLockfilesDirectoryName
std::atomic< size_t > ref_count_
Definition: LockMgrImpl.h:54
constexpr double n
Definition: Utm.h:38
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)
Definition: LockMgr.cpp:152
#define VLOG(n)
Definition: Logger.h:388