26 const std::string& tableName) {
27 const auto table_id = cat.
getTableId(tableName);
28 if (table_id.has_value()) {
50 gotlock =
mutex_.try_lock();
81 gotlock =
mutex_.try_lock_shared();
83 gotlock =
dmutex_->try_lock_shared();
108 std::lock_guard<std::mutex> access_map_lock(map_mutex_);
109 auto mutex_it = table_mutex_map_.find(table_key);
110 if (mutex_it != table_mutex_map_.end()) {
111 return mutex_it->second.get();
115 std::unique_ptr<heavyai::DistributedSharedMutex> dmutex =
116 getClusterTableMutex(table_key);
118 return table_mutex_map_
119 .emplace(table_key, std::make_unique<MutexTracker>(std::move(dmutex)))
120 .first->second.get();
125 std::set<ChunkKey> ret;
126 std::lock_guard<std::mutex> access_map_lock(map_mutex_);
127 for (
const auto& kv : table_mutex_map_) {
128 if (kv.second->isAcquired()) {
129 ret.insert(kv.first);
138 const std::string& table_name) {
139 auto lock =
WriteLock(getMutexTracker(cat, table_name));
141 validateExistingTable(cat, table_name);
148 return WriteLock(table_lock_mgr.getTableMutex(table_key));
153 const std::string& table_name) {
154 auto lock =
ReadLock(getMutexTracker(cat, table_name));
156 validateAndGetExistingTableId(cat, table_name);
163 return ReadLock(table_lock_mgr.getTableMutex(table_key));
167 std::unique_ptr<heavyai::DistributedSharedMutex>
169 std::unique_ptr<heavyai::DistributedSharedMutex> table_mutex;
171 std::string table_key_as_text;
172 for (
auto n : table_key) {
173 table_key_as_text += (!table_key_as_text.empty() ?
"_" :
"") +
std::to_string(
n);
180 auto cb_reload_catalog_metadata = [table_key](
bool write) {
181 if constexpr (T::kind ==
"insert") {
188 *
cat->dcatalogMutex_);
191 if constexpr (T::kind ==
"schema") {
193 auto cb_reload_table_metadata = [table_key, table_key_as_text](
size_t version) {
194 VLOG(2) <<
"reloading table metadata for: table_" << table_key_as_text;
200 *cat->dcatalogMutex_);
206 cb_reload_catalog_metadata,
207 cb_reload_table_metadata
209 auto schema_lockfile{
212 (
"table_" + table_key_as_text +
"." + T::kind.data() +
".lockfile")};
214 std::make_unique<heavyai::DistributedSharedMutex>(schema_lockfile.string(), cbs);
215 }
else if constexpr (T::kind ==
"data" || T::kind ==
"insert") {
217 auto cb_reload_table_data = [table_key, table_key_as_text](
size_t version) {
218 VLOG(2) <<
"invalidating table caches for new version " <<
version <<
" of: table_"
219 << table_key_as_text;
228 auto rows_lockfile{std::filesystem::path(
g_base_path) /
230 (
"table_" + table_key_as_text +
".rows.lockfile")};
231 std::shared_ptr<heavyai::DistributedSharedMutex> rows_mutex =
232 std::make_shared<heavyai::DistributedSharedMutex>(
233 rows_lockfile.string(),
238 auto cb_reload_row_data = [table_key, rows_mutex](
bool write) {
243 auto cb_notify_about_row_data = [table_key, rows_mutex](
bool write) {
252 cb_reload_catalog_metadata,
255 cb_notify_about_row_data
260 (
"table_" + table_key_as_text +
"." + T::kind.data() +
".lockfile")};
262 std::make_unique<heavyai::DistributedSharedMutex>(table_lockfile.string(), cbs);
264 UNREACHABLE() <<
"unexpected lockmgr kind: " << T::kind;
273 const std::string& table_name) {
275 validateAndGetExistingTableId(catalog, table_name)};
277 MutexTracker* tracker = table_lock_mgr.getTableMutex(chunk_key);
284 const std::string& table_name) {
285 validateAndGetExistingTableId(catalog, table_name);
291 const std::string& table_name) {
292 auto table_id = catalog.
getTableId(table_name);
293 if (!table_id.has_value()) {
296 return table_id.value();
static MutexTracker * getMutexTracker(const Catalog_Namespace::Catalog &catalog, const std::string &table_name)
std::vector< int > ChunkKey
static int32_t validateAndGetExistingTableId(const Catalog_Namespace::Catalog &catalog, const std::string &table_name)
const std::string kDataDirectoryName
virtual std::unique_ptr< heavyai::DistributedSharedMutex > getClusterTableMutex(const ChunkKey &table_key) const
class for a per-database catalog. also includes metadata for the current database and the current use...
virtual bool try_lock_shared()
std::optional< int32_t > getTableId(const std::string &table_name) const
std::set< ChunkKey > getLockedTables() const
static void validateExistingTable(const Catalog_Namespace::Catalog &catalog, const std::string &table_name)
size_t write(FILE *f, const size_t offset, const size_t size, const int8_t *buf)
Writes the specified number of bytes to the offset position in file f from buf.
std::shared_lock< T > shared_lock
This file contains the class specification and related data structures for Catalog.
TrackedRefLock< WriteLockBase > WriteLock
TrackedRefLock< ReadLockBase > ReadLock
static SysCatalog & instance()
const DBMetadata & getCurrentDB() const
std::unique_lock< T > unique_lock
#define CHUNK_KEY_TABLE_IDX
int getDatabaseId() const
ChunkKey chunk_key_for_table(const Catalog_Namespace::Catalog &cat, const std::string &tableName)
std::shared_ptr< Catalog > getCatalog(const std::string &dbName)
std::unique_ptr< heavyai::DistributedSharedMutex > dmutex_
virtual MutexTracker * getTableMutex(const ChunkKey &table_key)
static WriteLock getWriteLockForTable(const Catalog_Namespace::Catalog &cat, const std::string &table_name)
const std::string kCatalogDirectoryName
virtual void unlock_shared()
virtual void lock_shared()
const std::string kLockfilesDirectoryName
std::atomic< size_t > ref_count_
static ReadLock getReadLockForTable(Catalog_Namespace::Catalog &cat, const std::string &table_name)