1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include <linux/module.h> 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "member.h" 17 #include "recoverd.h" 18 #include "dir.h" 19 #include "midcomms.h" 20 #include "config.h" 21 #include "memory.h" 22 #include "lock.h" 23 #include "recover.h" 24 #include "requestqueue.h" 25 #include "user.h" 26 #include "ast.h" 27 28 static int ls_count; 29 static struct mutex ls_lock; 30 static struct list_head lslist; 31 static spinlock_t lslist_lock; 32 33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) 34 { 35 ssize_t ret = len; 36 int n; 37 int rc = kstrtoint(buf, 0, &n); 38 39 if (rc) 40 return rc; 41 ls = dlm_find_lockspace_local(ls); 42 if (!ls) 43 return -EINVAL; 44 45 switch (n) { 46 case 0: 47 dlm_ls_stop(ls); 48 break; 49 case 1: 50 dlm_ls_start(ls); 51 break; 52 default: 53 ret = -EINVAL; 54 } 55 dlm_put_lockspace(ls); 56 return ret; 57 } 58 59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len) 60 { 61 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result); 62 63 if (rc) 64 return rc; 65 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags); 66 wake_up(&ls->ls_uevent_wait); 67 return len; 68 } 69 70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf) 71 { 72 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id); 73 } 74 75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) 76 { 77 int rc = kstrtouint(buf, 0, &ls->ls_global_id); 78 79 if (rc) 80 return rc; 81 return len; 82 } 83 84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf) 85 { 86 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls)); 87 } 88 89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len) 90 { 91 int val; 92 int rc = kstrtoint(buf, 0, &val); 93 94 if (rc) 95 return rc; 96 if (val == 1) 97 set_bit(LSFL_NODIR, &ls->ls_flags); 98 return len; 99 } 100 101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) 102 { 103 uint32_t status = dlm_recover_status(ls); 104 return snprintf(buf, PAGE_SIZE, "%x\n", status); 105 } 106 107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf) 108 { 109 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid); 110 } 111 112 struct dlm_attr { 113 struct attribute attr; 114 ssize_t (*show)(struct dlm_ls *, char *); 115 ssize_t (*store)(struct dlm_ls *, const char *, size_t); 116 }; 117 118 static struct dlm_attr dlm_attr_control = { 119 .attr = {.name = "control", .mode = S_IWUSR}, 120 .store = dlm_control_store 121 }; 122 123 static struct dlm_attr dlm_attr_event = { 124 .attr = {.name = "event_done", .mode = S_IWUSR}, 125 .store = dlm_event_store 126 }; 127 128 static struct dlm_attr dlm_attr_id = { 129 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR}, 130 .show = dlm_id_show, 131 .store = dlm_id_store 132 }; 133 134 static struct dlm_attr dlm_attr_nodir = { 135 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR}, 136 .show = dlm_nodir_show, 137 .store = dlm_nodir_store 138 }; 139 140 static struct dlm_attr dlm_attr_recover_status = { 141 .attr = {.name = "recover_status", .mode = S_IRUGO}, 142 .show = dlm_recover_status_show 143 }; 144 145 static struct dlm_attr dlm_attr_recover_nodeid = { 146 .attr = {.name = "recover_nodeid", .mode = S_IRUGO}, 147 .show = dlm_recover_nodeid_show 148 }; 149 150 static struct attribute *dlm_attrs[] = { 151 &dlm_attr_control.attr, 152 &dlm_attr_event.attr, 153 &dlm_attr_id.attr, 154 &dlm_attr_nodir.attr, 155 &dlm_attr_recover_status.attr, 156 &dlm_attr_recover_nodeid.attr, 157 NULL, 158 }; 159 ATTRIBUTE_GROUPS(dlm); 160 161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr, 162 char *buf) 163 { 164 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 165 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 166 return a->show ? a->show(ls, buf) : 0; 167 } 168 169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr, 170 const char *buf, size_t len) 171 { 172 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 173 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr); 174 return a->store ? a->store(ls, buf, len) : len; 175 } 176 177 static void lockspace_kobj_release(struct kobject *k) 178 { 179 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj); 180 kfree(ls); 181 } 182 183 static const struct sysfs_ops dlm_attr_ops = { 184 .show = dlm_attr_show, 185 .store = dlm_attr_store, 186 }; 187 188 static struct kobj_type dlm_ktype = { 189 .default_groups = dlm_groups, 190 .sysfs_ops = &dlm_attr_ops, 191 .release = lockspace_kobj_release, 192 }; 193 194 static struct kset *dlm_kset; 195 196 static int do_uevent(struct dlm_ls *ls, int in) 197 { 198 if (in) 199 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE); 200 else 201 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE); 202 203 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving"); 204 205 /* dlm_controld will see the uevent, do the necessary group management 206 and then write to sysfs to wake us */ 207 208 wait_event(ls->ls_uevent_wait, 209 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags)); 210 211 log_rinfo(ls, "group event done %d", ls->ls_uevent_result); 212 213 return ls->ls_uevent_result; 214 } 215 216 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env) 217 { 218 const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj); 219 220 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name); 221 return 0; 222 } 223 224 static const struct kset_uevent_ops dlm_uevent_ops = { 225 .uevent = dlm_uevent, 226 }; 227 228 int __init dlm_lockspace_init(void) 229 { 230 ls_count = 0; 231 mutex_init(&ls_lock); 232 INIT_LIST_HEAD(&lslist); 233 spin_lock_init(&lslist_lock); 234 235 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj); 236 if (!dlm_kset) { 237 printk(KERN_WARNING "%s: can not create kset\n", __func__); 238 return -ENOMEM; 239 } 240 return 0; 241 } 242 243 void dlm_lockspace_exit(void) 244 { 245 kset_unregister(dlm_kset); 246 } 247 248 struct dlm_ls *dlm_find_lockspace_global(uint32_t id) 249 { 250 struct dlm_ls *ls; 251 252 spin_lock_bh(&lslist_lock); 253 254 list_for_each_entry(ls, &lslist, ls_list) { 255 if (ls->ls_global_id == id) { 256 atomic_inc(&ls->ls_count); 257 goto out; 258 } 259 } 260 ls = NULL; 261 out: 262 spin_unlock_bh(&lslist_lock); 263 return ls; 264 } 265 266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) 267 { 268 struct dlm_ls *ls = lockspace; 269 270 atomic_inc(&ls->ls_count); 271 return ls; 272 } 273 274 struct dlm_ls *dlm_find_lockspace_device(int minor) 275 { 276 struct dlm_ls *ls; 277 278 spin_lock_bh(&lslist_lock); 279 list_for_each_entry(ls, &lslist, ls_list) { 280 if (ls->ls_device.minor == minor) { 281 atomic_inc(&ls->ls_count); 282 goto out; 283 } 284 } 285 ls = NULL; 286 out: 287 spin_unlock_bh(&lslist_lock); 288 return ls; 289 } 290 291 void dlm_put_lockspace(struct dlm_ls *ls) 292 { 293 if (atomic_dec_and_test(&ls->ls_count)) 294 wake_up(&ls->ls_count_wait); 295 } 296 297 static void remove_lockspace(struct dlm_ls *ls) 298 { 299 retry: 300 wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0); 301 302 spin_lock_bh(&lslist_lock); 303 if (atomic_read(&ls->ls_count) != 0) { 304 spin_unlock_bh(&lslist_lock); 305 goto retry; 306 } 307 308 WARN_ON(ls->ls_create_count != 0); 309 list_del(&ls->ls_list); 310 spin_unlock_bh(&lslist_lock); 311 } 312 313 static int threads_start(void) 314 { 315 int error; 316 317 /* Thread for sending/receiving messages for all lockspace's */ 318 error = dlm_midcomms_start(); 319 if (error) 320 log_print("cannot start dlm midcomms %d", error); 321 322 return error; 323 } 324 325 static int new_lockspace(const char *name, const char *cluster, 326 uint32_t flags, int lvblen, 327 const struct dlm_lockspace_ops *ops, void *ops_arg, 328 int *ops_result, dlm_lockspace_t **lockspace) 329 { 330 struct dlm_ls *ls; 331 int do_unreg = 0; 332 int namelen = strlen(name); 333 int error; 334 335 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0) 336 return -EINVAL; 337 338 if (lvblen % 8) 339 return -EINVAL; 340 341 if (!try_module_get(THIS_MODULE)) 342 return -EINVAL; 343 344 if (!dlm_user_daemon_available()) { 345 log_print("dlm user daemon not available"); 346 error = -EUNATCH; 347 goto out; 348 } 349 350 if (ops && ops_result) { 351 if (!dlm_config.ci_recover_callbacks) 352 *ops_result = -EOPNOTSUPP; 353 else 354 *ops_result = 0; 355 } 356 357 if (!cluster) 358 log_print("dlm cluster name '%s' is being used without an application provided cluster name", 359 dlm_config.ci_cluster_name); 360 361 if (dlm_config.ci_recover_callbacks && cluster && 362 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) { 363 log_print("dlm cluster name '%s' does not match " 364 "the application cluster name '%s'", 365 dlm_config.ci_cluster_name, cluster); 366 error = -EBADR; 367 goto out; 368 } 369 370 error = 0; 371 372 spin_lock_bh(&lslist_lock); 373 list_for_each_entry(ls, &lslist, ls_list) { 374 WARN_ON(ls->ls_create_count <= 0); 375 if (ls->ls_namelen != namelen) 376 continue; 377 if (memcmp(ls->ls_name, name, namelen)) 378 continue; 379 if (flags & DLM_LSFL_NEWEXCL) { 380 error = -EEXIST; 381 break; 382 } 383 ls->ls_create_count++; 384 *lockspace = ls; 385 error = 1; 386 break; 387 } 388 spin_unlock_bh(&lslist_lock); 389 390 if (error) 391 goto out; 392 393 error = -ENOMEM; 394 395 ls = kzalloc(sizeof(*ls), GFP_NOFS); 396 if (!ls) 397 goto out; 398 memcpy(ls->ls_name, name, namelen); 399 ls->ls_namelen = namelen; 400 ls->ls_lvblen = lvblen; 401 atomic_set(&ls->ls_count, 0); 402 init_waitqueue_head(&ls->ls_count_wait); 403 ls->ls_flags = 0; 404 405 if (ops && dlm_config.ci_recover_callbacks) { 406 ls->ls_ops = ops; 407 ls->ls_ops_arg = ops_arg; 408 } 409 410 /* ls_exflags are forced to match among nodes, and we don't 411 * need to require all nodes to have some flags set 412 */ 413 ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); 414 415 INIT_LIST_HEAD(&ls->ls_toss); 416 INIT_LIST_HEAD(&ls->ls_keep); 417 rwlock_init(&ls->ls_rsbtbl_lock); 418 419 error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params); 420 if (error) 421 goto out_lsfree; 422 423 idr_init(&ls->ls_lkbidr); 424 rwlock_init(&ls->ls_lkbidr_lock); 425 426 INIT_LIST_HEAD(&ls->ls_waiters); 427 spin_lock_init(&ls->ls_waiters_lock); 428 INIT_LIST_HEAD(&ls->ls_orphans); 429 spin_lock_init(&ls->ls_orphans_lock); 430 431 INIT_LIST_HEAD(&ls->ls_nodes); 432 INIT_LIST_HEAD(&ls->ls_nodes_gone); 433 ls->ls_num_nodes = 0; 434 ls->ls_low_nodeid = 0; 435 ls->ls_total_weight = 0; 436 ls->ls_node_array = NULL; 437 438 memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb)); 439 ls->ls_local_rsb.res_ls = ls; 440 441 ls->ls_debug_rsb_dentry = NULL; 442 ls->ls_debug_waiters_dentry = NULL; 443 444 init_waitqueue_head(&ls->ls_uevent_wait); 445 ls->ls_uevent_result = 0; 446 init_completion(&ls->ls_recovery_done); 447 ls->ls_recovery_result = -1; 448 449 spin_lock_init(&ls->ls_cb_lock); 450 INIT_LIST_HEAD(&ls->ls_cb_delay); 451 452 ls->ls_recoverd_task = NULL; 453 mutex_init(&ls->ls_recoverd_active); 454 spin_lock_init(&ls->ls_recover_lock); 455 spin_lock_init(&ls->ls_rcom_spin); 456 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t)); 457 ls->ls_recover_status = 0; 458 ls->ls_recover_seq = get_random_u64(); 459 ls->ls_recover_args = NULL; 460 init_rwsem(&ls->ls_in_recovery); 461 rwlock_init(&ls->ls_recv_active); 462 INIT_LIST_HEAD(&ls->ls_requestqueue); 463 rwlock_init(&ls->ls_requestqueue_lock); 464 spin_lock_init(&ls->ls_clear_proc_locks); 465 466 /* Due backwards compatibility with 3.1 we need to use maximum 467 * possible dlm message size to be sure the message will fit and 468 * not having out of bounds issues. However on sending side 3.2 469 * might send less. 470 */ 471 ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS); 472 if (!ls->ls_recover_buf) { 473 error = -ENOMEM; 474 goto out_lkbidr; 475 } 476 477 ls->ls_slot = 0; 478 ls->ls_num_slots = 0; 479 ls->ls_slots_size = 0; 480 ls->ls_slots = NULL; 481 482 INIT_LIST_HEAD(&ls->ls_recover_list); 483 spin_lock_init(&ls->ls_recover_list_lock); 484 idr_init(&ls->ls_recover_idr); 485 spin_lock_init(&ls->ls_recover_idr_lock); 486 ls->ls_recover_list_count = 0; 487 init_waitqueue_head(&ls->ls_wait_general); 488 INIT_LIST_HEAD(&ls->ls_masters_list); 489 rwlock_init(&ls->ls_masters_lock); 490 INIT_LIST_HEAD(&ls->ls_dir_dump_list); 491 rwlock_init(&ls->ls_dir_dump_lock); 492 493 INIT_LIST_HEAD(&ls->ls_toss_q); 494 spin_lock_init(&ls->ls_toss_q_lock); 495 timer_setup(&ls->ls_timer, dlm_rsb_toss_timer, 496 TIMER_DEFERRABLE); 497 498 spin_lock_bh(&lslist_lock); 499 ls->ls_create_count = 1; 500 list_add(&ls->ls_list, &lslist); 501 spin_unlock_bh(&lslist_lock); 502 503 if (flags & DLM_LSFL_FS) { 504 error = dlm_callback_start(ls); 505 if (error) { 506 log_error(ls, "can't start dlm_callback %d", error); 507 goto out_delist; 508 } 509 } 510 511 init_waitqueue_head(&ls->ls_recover_lock_wait); 512 513 /* 514 * Once started, dlm_recoverd first looks for ls in lslist, then 515 * initializes ls_in_recovery as locked in "down" mode. We need 516 * to wait for the wakeup from dlm_recoverd because in_recovery 517 * has to start out in down mode. 518 */ 519 520 error = dlm_recoverd_start(ls); 521 if (error) { 522 log_error(ls, "can't start dlm_recoverd %d", error); 523 goto out_callback; 524 } 525 526 wait_event(ls->ls_recover_lock_wait, 527 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags)); 528 529 /* let kobject handle freeing of ls if there's an error */ 530 do_unreg = 1; 531 532 ls->ls_kobj.kset = dlm_kset; 533 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL, 534 "%s", ls->ls_name); 535 if (error) 536 goto out_recoverd; 537 kobject_uevent(&ls->ls_kobj, KOBJ_ADD); 538 539 /* This uevent triggers dlm_controld in userspace to add us to the 540 group of nodes that are members of this lockspace (managed by the 541 cluster infrastructure.) Once it's done that, it tells us who the 542 current lockspace members are (via configfs) and then tells the 543 lockspace to start running (via sysfs) in dlm_ls_start(). */ 544 545 error = do_uevent(ls, 1); 546 if (error) 547 goto out_recoverd; 548 549 /* wait until recovery is successful or failed */ 550 wait_for_completion(&ls->ls_recovery_done); 551 error = ls->ls_recovery_result; 552 if (error) 553 goto out_members; 554 555 dlm_create_debug_file(ls); 556 557 log_rinfo(ls, "join complete"); 558 *lockspace = ls; 559 return 0; 560 561 out_members: 562 do_uevent(ls, 0); 563 dlm_clear_members(ls); 564 kfree(ls->ls_node_array); 565 out_recoverd: 566 dlm_recoverd_stop(ls); 567 out_callback: 568 dlm_callback_stop(ls); 569 out_delist: 570 spin_lock_bh(&lslist_lock); 571 list_del(&ls->ls_list); 572 spin_unlock_bh(&lslist_lock); 573 idr_destroy(&ls->ls_recover_idr); 574 kfree(ls->ls_recover_buf); 575 out_lkbidr: 576 idr_destroy(&ls->ls_lkbidr); 577 rhashtable_destroy(&ls->ls_rsbtbl); 578 out_lsfree: 579 if (do_unreg) 580 kobject_put(&ls->ls_kobj); 581 else 582 kfree(ls); 583 out: 584 module_put(THIS_MODULE); 585 return error; 586 } 587 588 static int __dlm_new_lockspace(const char *name, const char *cluster, 589 uint32_t flags, int lvblen, 590 const struct dlm_lockspace_ops *ops, 591 void *ops_arg, int *ops_result, 592 dlm_lockspace_t **lockspace) 593 { 594 int error = 0; 595 596 mutex_lock(&ls_lock); 597 if (!ls_count) 598 error = threads_start(); 599 if (error) 600 goto out; 601 602 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg, 603 ops_result, lockspace); 604 if (!error) 605 ls_count++; 606 if (error > 0) 607 error = 0; 608 if (!ls_count) { 609 dlm_midcomms_shutdown(); 610 dlm_midcomms_stop(); 611 } 612 out: 613 mutex_unlock(&ls_lock); 614 return error; 615 } 616 617 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags, 618 int lvblen, const struct dlm_lockspace_ops *ops, 619 void *ops_arg, int *ops_result, 620 dlm_lockspace_t **lockspace) 621 { 622 return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen, 623 ops, ops_arg, ops_result, lockspace); 624 } 625 626 int dlm_new_user_lockspace(const char *name, const char *cluster, 627 uint32_t flags, int lvblen, 628 const struct dlm_lockspace_ops *ops, 629 void *ops_arg, int *ops_result, 630 dlm_lockspace_t **lockspace) 631 { 632 return __dlm_new_lockspace(name, cluster, flags, lvblen, ops, 633 ops_arg, ops_result, lockspace); 634 } 635 636 static int lkb_idr_is_local(int id, void *p, void *data) 637 { 638 struct dlm_lkb *lkb = p; 639 640 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV; 641 } 642 643 static int lkb_idr_is_any(int id, void *p, void *data) 644 { 645 return 1; 646 } 647 648 static int lkb_idr_free(int id, void *p, void *data) 649 { 650 struct dlm_lkb *lkb = p; 651 652 if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) 653 dlm_free_lvb(lkb->lkb_lvbptr); 654 655 dlm_free_lkb(lkb); 656 return 0; 657 } 658 659 /* NOTE: We check the lkbidr here rather than the resource table. 660 This is because there may be LKBs queued as ASTs that have been unlinked 661 from their RSBs and are pending deletion once the AST has been delivered */ 662 663 static int lockspace_busy(struct dlm_ls *ls, int force) 664 { 665 int rv; 666 667 read_lock_bh(&ls->ls_lkbidr_lock); 668 if (force == 0) { 669 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls); 670 } else if (force == 1) { 671 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls); 672 } else { 673 rv = 0; 674 } 675 read_unlock_bh(&ls->ls_lkbidr_lock); 676 return rv; 677 } 678 679 static void rhash_free_rsb(void *ptr, void *arg) 680 { 681 struct dlm_rsb *rsb = ptr; 682 683 dlm_free_rsb(rsb); 684 } 685 686 static int release_lockspace(struct dlm_ls *ls, int force) 687 { 688 int busy, rv; 689 690 busy = lockspace_busy(ls, force); 691 692 spin_lock_bh(&lslist_lock); 693 if (ls->ls_create_count == 1) { 694 if (busy) { 695 rv = -EBUSY; 696 } else { 697 /* remove_lockspace takes ls off lslist */ 698 ls->ls_create_count = 0; 699 rv = 0; 700 } 701 } else if (ls->ls_create_count > 1) { 702 rv = --ls->ls_create_count; 703 } else { 704 rv = -EINVAL; 705 } 706 spin_unlock_bh(&lslist_lock); 707 708 if (rv) { 709 log_debug(ls, "release_lockspace no remove %d", rv); 710 return rv; 711 } 712 713 if (ls_count == 1) 714 dlm_midcomms_version_wait(); 715 716 dlm_device_deregister(ls); 717 718 if (force < 3 && dlm_user_daemon_available()) 719 do_uevent(ls, 0); 720 721 dlm_recoverd_stop(ls); 722 723 /* clear the LSFL_RUNNING flag to fast up 724 * time_shutdown_sync(), we don't care anymore 725 */ 726 clear_bit(LSFL_RUNNING, &ls->ls_flags); 727 timer_shutdown_sync(&ls->ls_timer); 728 729 if (ls_count == 1) { 730 dlm_clear_members(ls); 731 dlm_midcomms_shutdown(); 732 } 733 734 dlm_callback_stop(ls); 735 736 remove_lockspace(ls); 737 738 dlm_delete_debug_file(ls); 739 740 idr_destroy(&ls->ls_recover_idr); 741 kfree(ls->ls_recover_buf); 742 743 /* 744 * Free all lkb's in idr 745 */ 746 747 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls); 748 idr_destroy(&ls->ls_lkbidr); 749 750 /* 751 * Free all rsb's on rsbtbl 752 */ 753 rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); 754 755 /* 756 * Free structures on any other lists 757 */ 758 759 dlm_purge_requestqueue(ls); 760 kfree(ls->ls_recover_args); 761 dlm_clear_members(ls); 762 dlm_clear_members_gone(ls); 763 kfree(ls->ls_node_array); 764 log_rinfo(ls, "release_lockspace final free"); 765 kobject_put(&ls->ls_kobj); 766 /* The ls structure will be freed when the kobject is done with */ 767 768 module_put(THIS_MODULE); 769 return 0; 770 } 771 772 /* 773 * Called when a system has released all its locks and is not going to use the 774 * lockspace any longer. We free everything we're managing for this lockspace. 775 * Remaining nodes will go through the recovery process as if we'd died. The 776 * lockspace must continue to function as usual, participating in recoveries, 777 * until this returns. 778 * 779 * Force has 4 possible values: 780 * 0 - don't destroy lockspace if it has any LKBs 781 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs 782 * 2 - destroy lockspace regardless of LKBs 783 * 3 - destroy lockspace as part of a forced shutdown 784 */ 785 786 int dlm_release_lockspace(void *lockspace, int force) 787 { 788 struct dlm_ls *ls; 789 int error; 790 791 ls = dlm_find_lockspace_local(lockspace); 792 if (!ls) 793 return -EINVAL; 794 dlm_put_lockspace(ls); 795 796 mutex_lock(&ls_lock); 797 error = release_lockspace(ls, force); 798 if (!error) 799 ls_count--; 800 if (!ls_count) 801 dlm_midcomms_stop(); 802 mutex_unlock(&ls_lock); 803 804 return error; 805 } 806 807 void dlm_stop_lockspaces(void) 808 { 809 struct dlm_ls *ls; 810 int count; 811 812 restart: 813 count = 0; 814 spin_lock_bh(&lslist_lock); 815 list_for_each_entry(ls, &lslist, ls_list) { 816 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { 817 count++; 818 continue; 819 } 820 spin_unlock_bh(&lslist_lock); 821 log_error(ls, "no userland control daemon, stopping lockspace"); 822 dlm_ls_stop(ls); 823 goto restart; 824 } 825 spin_unlock_bh(&lslist_lock); 826 827 if (count) 828 log_print("dlm user daemon left %d lockspaces", count); 829 } 830