xref: /linux-6.15/fs/dlm/lockspace.c (revision 1ffefc19)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include <linux/module.h>
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27 
28 static int			ls_count;
29 static struct mutex		ls_lock;
30 static struct list_head		lslist;
31 static spinlock_t		lslist_lock;
32 
33 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
34 {
35 	ssize_t ret = len;
36 	int n;
37 	int rc = kstrtoint(buf, 0, &n);
38 
39 	if (rc)
40 		return rc;
41 	ls = dlm_find_lockspace_local(ls);
42 	if (!ls)
43 		return -EINVAL;
44 
45 	switch (n) {
46 	case 0:
47 		dlm_ls_stop(ls);
48 		break;
49 	case 1:
50 		dlm_ls_start(ls);
51 		break;
52 	default:
53 		ret = -EINVAL;
54 	}
55 	dlm_put_lockspace(ls);
56 	return ret;
57 }
58 
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61 	int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
62 
63 	if (rc)
64 		return rc;
65 	set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
66 	wake_up(&ls->ls_uevent_wait);
67 	return len;
68 }
69 
70 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
71 {
72 	return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
73 }
74 
75 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
76 {
77 	int rc = kstrtouint(buf, 0, &ls->ls_global_id);
78 
79 	if (rc)
80 		return rc;
81 	return len;
82 }
83 
84 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
85 {
86 	return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
87 }
88 
89 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
90 {
91 	int val;
92 	int rc = kstrtoint(buf, 0, &val);
93 
94 	if (rc)
95 		return rc;
96 	if (val == 1)
97 		set_bit(LSFL_NODIR, &ls->ls_flags);
98 	return len;
99 }
100 
101 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
102 {
103 	uint32_t status = dlm_recover_status(ls);
104 	return snprintf(buf, PAGE_SIZE, "%x\n", status);
105 }
106 
107 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
108 {
109 	return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
110 }
111 
112 struct dlm_attr {
113 	struct attribute attr;
114 	ssize_t (*show)(struct dlm_ls *, char *);
115 	ssize_t (*store)(struct dlm_ls *, const char *, size_t);
116 };
117 
118 static struct dlm_attr dlm_attr_control = {
119 	.attr  = {.name = "control", .mode = S_IWUSR},
120 	.store = dlm_control_store
121 };
122 
123 static struct dlm_attr dlm_attr_event = {
124 	.attr  = {.name = "event_done", .mode = S_IWUSR},
125 	.store = dlm_event_store
126 };
127 
128 static struct dlm_attr dlm_attr_id = {
129 	.attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
130 	.show  = dlm_id_show,
131 	.store = dlm_id_store
132 };
133 
134 static struct dlm_attr dlm_attr_nodir = {
135 	.attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
136 	.show  = dlm_nodir_show,
137 	.store = dlm_nodir_store
138 };
139 
140 static struct dlm_attr dlm_attr_recover_status = {
141 	.attr  = {.name = "recover_status", .mode = S_IRUGO},
142 	.show  = dlm_recover_status_show
143 };
144 
145 static struct dlm_attr dlm_attr_recover_nodeid = {
146 	.attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
147 	.show  = dlm_recover_nodeid_show
148 };
149 
150 static struct attribute *dlm_attrs[] = {
151 	&dlm_attr_control.attr,
152 	&dlm_attr_event.attr,
153 	&dlm_attr_id.attr,
154 	&dlm_attr_nodir.attr,
155 	&dlm_attr_recover_status.attr,
156 	&dlm_attr_recover_nodeid.attr,
157 	NULL,
158 };
159 ATTRIBUTE_GROUPS(dlm);
160 
161 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
162 			     char *buf)
163 {
164 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
165 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
166 	return a->show ? a->show(ls, buf) : 0;
167 }
168 
169 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
170 			      const char *buf, size_t len)
171 {
172 	struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
173 	struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
174 	return a->store ? a->store(ls, buf, len) : len;
175 }
176 
177 static void lockspace_kobj_release(struct kobject *k)
178 {
179 	struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
180 	kfree(ls);
181 }
182 
183 static const struct sysfs_ops dlm_attr_ops = {
184 	.show  = dlm_attr_show,
185 	.store = dlm_attr_store,
186 };
187 
188 static struct kobj_type dlm_ktype = {
189 	.default_groups = dlm_groups,
190 	.sysfs_ops     = &dlm_attr_ops,
191 	.release       = lockspace_kobj_release,
192 };
193 
194 static struct kset *dlm_kset;
195 
196 static int do_uevent(struct dlm_ls *ls, int in)
197 {
198 	if (in)
199 		kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
200 	else
201 		kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
202 
203 	log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
204 
205 	/* dlm_controld will see the uevent, do the necessary group management
206 	   and then write to sysfs to wake us */
207 
208 	wait_event(ls->ls_uevent_wait,
209 		   test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
210 
211 	log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
212 
213 	return ls->ls_uevent_result;
214 }
215 
216 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
217 {
218 	const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
219 
220 	add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
221 	return 0;
222 }
223 
224 static const struct kset_uevent_ops dlm_uevent_ops = {
225 	.uevent = dlm_uevent,
226 };
227 
228 int __init dlm_lockspace_init(void)
229 {
230 	ls_count = 0;
231 	mutex_init(&ls_lock);
232 	INIT_LIST_HEAD(&lslist);
233 	spin_lock_init(&lslist_lock);
234 
235 	dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
236 	if (!dlm_kset) {
237 		printk(KERN_WARNING "%s: can not create kset\n", __func__);
238 		return -ENOMEM;
239 	}
240 	return 0;
241 }
242 
243 void dlm_lockspace_exit(void)
244 {
245 	kset_unregister(dlm_kset);
246 }
247 
248 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
249 {
250 	struct dlm_ls *ls;
251 
252 	spin_lock_bh(&lslist_lock);
253 
254 	list_for_each_entry(ls, &lslist, ls_list) {
255 		if (ls->ls_global_id == id) {
256 			atomic_inc(&ls->ls_count);
257 			goto out;
258 		}
259 	}
260 	ls = NULL;
261  out:
262 	spin_unlock_bh(&lslist_lock);
263 	return ls;
264 }
265 
266 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
267 {
268 	struct dlm_ls *ls = lockspace;
269 
270 	atomic_inc(&ls->ls_count);
271 	return ls;
272 }
273 
274 struct dlm_ls *dlm_find_lockspace_device(int minor)
275 {
276 	struct dlm_ls *ls;
277 
278 	spin_lock_bh(&lslist_lock);
279 	list_for_each_entry(ls, &lslist, ls_list) {
280 		if (ls->ls_device.minor == minor) {
281 			atomic_inc(&ls->ls_count);
282 			goto out;
283 		}
284 	}
285 	ls = NULL;
286  out:
287 	spin_unlock_bh(&lslist_lock);
288 	return ls;
289 }
290 
291 void dlm_put_lockspace(struct dlm_ls *ls)
292 {
293 	if (atomic_dec_and_test(&ls->ls_count))
294 		wake_up(&ls->ls_count_wait);
295 }
296 
297 static void remove_lockspace(struct dlm_ls *ls)
298 {
299 retry:
300 	wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
301 
302 	spin_lock_bh(&lslist_lock);
303 	if (atomic_read(&ls->ls_count) != 0) {
304 		spin_unlock_bh(&lslist_lock);
305 		goto retry;
306 	}
307 
308 	WARN_ON(ls->ls_create_count != 0);
309 	list_del(&ls->ls_list);
310 	spin_unlock_bh(&lslist_lock);
311 }
312 
313 static int threads_start(void)
314 {
315 	int error;
316 
317 	/* Thread for sending/receiving messages for all lockspace's */
318 	error = dlm_midcomms_start();
319 	if (error)
320 		log_print("cannot start dlm midcomms %d", error);
321 
322 	return error;
323 }
324 
325 static int new_lockspace(const char *name, const char *cluster,
326 			 uint32_t flags, int lvblen,
327 			 const struct dlm_lockspace_ops *ops, void *ops_arg,
328 			 int *ops_result, dlm_lockspace_t **lockspace)
329 {
330 	struct dlm_ls *ls;
331 	int do_unreg = 0;
332 	int namelen = strlen(name);
333 	int error;
334 
335 	if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
336 		return -EINVAL;
337 
338 	if (lvblen % 8)
339 		return -EINVAL;
340 
341 	if (!try_module_get(THIS_MODULE))
342 		return -EINVAL;
343 
344 	if (!dlm_user_daemon_available()) {
345 		log_print("dlm user daemon not available");
346 		error = -EUNATCH;
347 		goto out;
348 	}
349 
350 	if (ops && ops_result) {
351 	       	if (!dlm_config.ci_recover_callbacks)
352 			*ops_result = -EOPNOTSUPP;
353 		else
354 			*ops_result = 0;
355 	}
356 
357 	if (!cluster)
358 		log_print("dlm cluster name '%s' is being used without an application provided cluster name",
359 			  dlm_config.ci_cluster_name);
360 
361 	if (dlm_config.ci_recover_callbacks && cluster &&
362 	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
363 		log_print("dlm cluster name '%s' does not match "
364 			  "the application cluster name '%s'",
365 			  dlm_config.ci_cluster_name, cluster);
366 		error = -EBADR;
367 		goto out;
368 	}
369 
370 	error = 0;
371 
372 	spin_lock_bh(&lslist_lock);
373 	list_for_each_entry(ls, &lslist, ls_list) {
374 		WARN_ON(ls->ls_create_count <= 0);
375 		if (ls->ls_namelen != namelen)
376 			continue;
377 		if (memcmp(ls->ls_name, name, namelen))
378 			continue;
379 		if (flags & DLM_LSFL_NEWEXCL) {
380 			error = -EEXIST;
381 			break;
382 		}
383 		ls->ls_create_count++;
384 		*lockspace = ls;
385 		error = 1;
386 		break;
387 	}
388 	spin_unlock_bh(&lslist_lock);
389 
390 	if (error)
391 		goto out;
392 
393 	error = -ENOMEM;
394 
395 	ls = kzalloc(sizeof(*ls), GFP_NOFS);
396 	if (!ls)
397 		goto out;
398 	memcpy(ls->ls_name, name, namelen);
399 	ls->ls_namelen = namelen;
400 	ls->ls_lvblen = lvblen;
401 	atomic_set(&ls->ls_count, 0);
402 	init_waitqueue_head(&ls->ls_count_wait);
403 	ls->ls_flags = 0;
404 
405 	if (ops && dlm_config.ci_recover_callbacks) {
406 		ls->ls_ops = ops;
407 		ls->ls_ops_arg = ops_arg;
408 	}
409 
410 	/* ls_exflags are forced to match among nodes, and we don't
411 	 * need to require all nodes to have some flags set
412 	 */
413 	ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
414 
415 	INIT_LIST_HEAD(&ls->ls_toss);
416 	INIT_LIST_HEAD(&ls->ls_keep);
417 	rwlock_init(&ls->ls_rsbtbl_lock);
418 
419 	error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params);
420 	if (error)
421 		goto out_lsfree;
422 
423 	idr_init(&ls->ls_lkbidr);
424 	rwlock_init(&ls->ls_lkbidr_lock);
425 
426 	INIT_LIST_HEAD(&ls->ls_waiters);
427 	spin_lock_init(&ls->ls_waiters_lock);
428 	INIT_LIST_HEAD(&ls->ls_orphans);
429 	spin_lock_init(&ls->ls_orphans_lock);
430 
431 	INIT_LIST_HEAD(&ls->ls_nodes);
432 	INIT_LIST_HEAD(&ls->ls_nodes_gone);
433 	ls->ls_num_nodes = 0;
434 	ls->ls_low_nodeid = 0;
435 	ls->ls_total_weight = 0;
436 	ls->ls_node_array = NULL;
437 
438 	memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
439 	ls->ls_local_rsb.res_ls = ls;
440 
441 	ls->ls_debug_rsb_dentry = NULL;
442 	ls->ls_debug_waiters_dentry = NULL;
443 
444 	init_waitqueue_head(&ls->ls_uevent_wait);
445 	ls->ls_uevent_result = 0;
446 	init_completion(&ls->ls_recovery_done);
447 	ls->ls_recovery_result = -1;
448 
449 	spin_lock_init(&ls->ls_cb_lock);
450 	INIT_LIST_HEAD(&ls->ls_cb_delay);
451 
452 	ls->ls_recoverd_task = NULL;
453 	mutex_init(&ls->ls_recoverd_active);
454 	spin_lock_init(&ls->ls_recover_lock);
455 	spin_lock_init(&ls->ls_rcom_spin);
456 	get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
457 	ls->ls_recover_status = 0;
458 	ls->ls_recover_seq = get_random_u64();
459 	ls->ls_recover_args = NULL;
460 	init_rwsem(&ls->ls_in_recovery);
461 	rwlock_init(&ls->ls_recv_active);
462 	INIT_LIST_HEAD(&ls->ls_requestqueue);
463 	rwlock_init(&ls->ls_requestqueue_lock);
464 	spin_lock_init(&ls->ls_clear_proc_locks);
465 
466 	/* Due backwards compatibility with 3.1 we need to use maximum
467 	 * possible dlm message size to be sure the message will fit and
468 	 * not having out of bounds issues. However on sending side 3.2
469 	 * might send less.
470 	 */
471 	ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
472 	if (!ls->ls_recover_buf) {
473 		error = -ENOMEM;
474 		goto out_lkbidr;
475 	}
476 
477 	ls->ls_slot = 0;
478 	ls->ls_num_slots = 0;
479 	ls->ls_slots_size = 0;
480 	ls->ls_slots = NULL;
481 
482 	INIT_LIST_HEAD(&ls->ls_recover_list);
483 	spin_lock_init(&ls->ls_recover_list_lock);
484 	idr_init(&ls->ls_recover_idr);
485 	spin_lock_init(&ls->ls_recover_idr_lock);
486 	ls->ls_recover_list_count = 0;
487 	init_waitqueue_head(&ls->ls_wait_general);
488 	INIT_LIST_HEAD(&ls->ls_masters_list);
489 	rwlock_init(&ls->ls_masters_lock);
490 	INIT_LIST_HEAD(&ls->ls_dir_dump_list);
491 	rwlock_init(&ls->ls_dir_dump_lock);
492 
493 	INIT_LIST_HEAD(&ls->ls_toss_q);
494 	spin_lock_init(&ls->ls_toss_q_lock);
495 	timer_setup(&ls->ls_timer, dlm_rsb_toss_timer,
496 		    TIMER_DEFERRABLE);
497 
498 	spin_lock_bh(&lslist_lock);
499 	ls->ls_create_count = 1;
500 	list_add(&ls->ls_list, &lslist);
501 	spin_unlock_bh(&lslist_lock);
502 
503 	if (flags & DLM_LSFL_FS) {
504 		error = dlm_callback_start(ls);
505 		if (error) {
506 			log_error(ls, "can't start dlm_callback %d", error);
507 			goto out_delist;
508 		}
509 	}
510 
511 	init_waitqueue_head(&ls->ls_recover_lock_wait);
512 
513 	/*
514 	 * Once started, dlm_recoverd first looks for ls in lslist, then
515 	 * initializes ls_in_recovery as locked in "down" mode.  We need
516 	 * to wait for the wakeup from dlm_recoverd because in_recovery
517 	 * has to start out in down mode.
518 	 */
519 
520 	error = dlm_recoverd_start(ls);
521 	if (error) {
522 		log_error(ls, "can't start dlm_recoverd %d", error);
523 		goto out_callback;
524 	}
525 
526 	wait_event(ls->ls_recover_lock_wait,
527 		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
528 
529 	/* let kobject handle freeing of ls if there's an error */
530 	do_unreg = 1;
531 
532 	ls->ls_kobj.kset = dlm_kset;
533 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
534 				     "%s", ls->ls_name);
535 	if (error)
536 		goto out_recoverd;
537 	kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
538 
539 	/* This uevent triggers dlm_controld in userspace to add us to the
540 	   group of nodes that are members of this lockspace (managed by the
541 	   cluster infrastructure.)  Once it's done that, it tells us who the
542 	   current lockspace members are (via configfs) and then tells the
543 	   lockspace to start running (via sysfs) in dlm_ls_start(). */
544 
545 	error = do_uevent(ls, 1);
546 	if (error)
547 		goto out_recoverd;
548 
549 	/* wait until recovery is successful or failed */
550 	wait_for_completion(&ls->ls_recovery_done);
551 	error = ls->ls_recovery_result;
552 	if (error)
553 		goto out_members;
554 
555 	dlm_create_debug_file(ls);
556 
557 	log_rinfo(ls, "join complete");
558 	*lockspace = ls;
559 	return 0;
560 
561  out_members:
562 	do_uevent(ls, 0);
563 	dlm_clear_members(ls);
564 	kfree(ls->ls_node_array);
565  out_recoverd:
566 	dlm_recoverd_stop(ls);
567  out_callback:
568 	dlm_callback_stop(ls);
569  out_delist:
570 	spin_lock_bh(&lslist_lock);
571 	list_del(&ls->ls_list);
572 	spin_unlock_bh(&lslist_lock);
573 	idr_destroy(&ls->ls_recover_idr);
574 	kfree(ls->ls_recover_buf);
575  out_lkbidr:
576 	idr_destroy(&ls->ls_lkbidr);
577 	rhashtable_destroy(&ls->ls_rsbtbl);
578  out_lsfree:
579 	if (do_unreg)
580 		kobject_put(&ls->ls_kobj);
581 	else
582 		kfree(ls);
583  out:
584 	module_put(THIS_MODULE);
585 	return error;
586 }
587 
588 static int __dlm_new_lockspace(const char *name, const char *cluster,
589 			       uint32_t flags, int lvblen,
590 			       const struct dlm_lockspace_ops *ops,
591 			       void *ops_arg, int *ops_result,
592 			       dlm_lockspace_t **lockspace)
593 {
594 	int error = 0;
595 
596 	mutex_lock(&ls_lock);
597 	if (!ls_count)
598 		error = threads_start();
599 	if (error)
600 		goto out;
601 
602 	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
603 			      ops_result, lockspace);
604 	if (!error)
605 		ls_count++;
606 	if (error > 0)
607 		error = 0;
608 	if (!ls_count) {
609 		dlm_midcomms_shutdown();
610 		dlm_midcomms_stop();
611 	}
612  out:
613 	mutex_unlock(&ls_lock);
614 	return error;
615 }
616 
617 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
618 		      int lvblen, const struct dlm_lockspace_ops *ops,
619 		      void *ops_arg, int *ops_result,
620 		      dlm_lockspace_t **lockspace)
621 {
622 	return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
623 				   ops, ops_arg, ops_result, lockspace);
624 }
625 
626 int dlm_new_user_lockspace(const char *name, const char *cluster,
627 			   uint32_t flags, int lvblen,
628 			   const struct dlm_lockspace_ops *ops,
629 			   void *ops_arg, int *ops_result,
630 			   dlm_lockspace_t **lockspace)
631 {
632 	return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
633 				   ops_arg, ops_result, lockspace);
634 }
635 
636 static int lkb_idr_is_local(int id, void *p, void *data)
637 {
638 	struct dlm_lkb *lkb = p;
639 
640 	return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
641 }
642 
643 static int lkb_idr_is_any(int id, void *p, void *data)
644 {
645 	return 1;
646 }
647 
648 static int lkb_idr_free(int id, void *p, void *data)
649 {
650 	struct dlm_lkb *lkb = p;
651 
652 	if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
653 		dlm_free_lvb(lkb->lkb_lvbptr);
654 
655 	dlm_free_lkb(lkb);
656 	return 0;
657 }
658 
659 /* NOTE: We check the lkbidr here rather than the resource table.
660    This is because there may be LKBs queued as ASTs that have been unlinked
661    from their RSBs and are pending deletion once the AST has been delivered */
662 
663 static int lockspace_busy(struct dlm_ls *ls, int force)
664 {
665 	int rv;
666 
667 	read_lock_bh(&ls->ls_lkbidr_lock);
668 	if (force == 0) {
669 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
670 	} else if (force == 1) {
671 		rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
672 	} else {
673 		rv = 0;
674 	}
675 	read_unlock_bh(&ls->ls_lkbidr_lock);
676 	return rv;
677 }
678 
679 static void rhash_free_rsb(void *ptr, void *arg)
680 {
681 	struct dlm_rsb *rsb = ptr;
682 
683 	dlm_free_rsb(rsb);
684 }
685 
686 static int release_lockspace(struct dlm_ls *ls, int force)
687 {
688 	int busy, rv;
689 
690 	busy = lockspace_busy(ls, force);
691 
692 	spin_lock_bh(&lslist_lock);
693 	if (ls->ls_create_count == 1) {
694 		if (busy) {
695 			rv = -EBUSY;
696 		} else {
697 			/* remove_lockspace takes ls off lslist */
698 			ls->ls_create_count = 0;
699 			rv = 0;
700 		}
701 	} else if (ls->ls_create_count > 1) {
702 		rv = --ls->ls_create_count;
703 	} else {
704 		rv = -EINVAL;
705 	}
706 	spin_unlock_bh(&lslist_lock);
707 
708 	if (rv) {
709 		log_debug(ls, "release_lockspace no remove %d", rv);
710 		return rv;
711 	}
712 
713 	if (ls_count == 1)
714 		dlm_midcomms_version_wait();
715 
716 	dlm_device_deregister(ls);
717 
718 	if (force < 3 && dlm_user_daemon_available())
719 		do_uevent(ls, 0);
720 
721 	dlm_recoverd_stop(ls);
722 
723 	/* clear the LSFL_RUNNING flag to fast up
724 	 * time_shutdown_sync(), we don't care anymore
725 	 */
726 	clear_bit(LSFL_RUNNING, &ls->ls_flags);
727 	timer_shutdown_sync(&ls->ls_timer);
728 
729 	if (ls_count == 1) {
730 		dlm_clear_members(ls);
731 		dlm_midcomms_shutdown();
732 	}
733 
734 	dlm_callback_stop(ls);
735 
736 	remove_lockspace(ls);
737 
738 	dlm_delete_debug_file(ls);
739 
740 	idr_destroy(&ls->ls_recover_idr);
741 	kfree(ls->ls_recover_buf);
742 
743 	/*
744 	 * Free all lkb's in idr
745 	 */
746 
747 	idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
748 	idr_destroy(&ls->ls_lkbidr);
749 
750 	/*
751 	 * Free all rsb's on rsbtbl
752 	 */
753 	rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL);
754 
755 	/*
756 	 * Free structures on any other lists
757 	 */
758 
759 	dlm_purge_requestqueue(ls);
760 	kfree(ls->ls_recover_args);
761 	dlm_clear_members(ls);
762 	dlm_clear_members_gone(ls);
763 	kfree(ls->ls_node_array);
764 	log_rinfo(ls, "release_lockspace final free");
765 	kobject_put(&ls->ls_kobj);
766 	/* The ls structure will be freed when the kobject is done with */
767 
768 	module_put(THIS_MODULE);
769 	return 0;
770 }
771 
772 /*
773  * Called when a system has released all its locks and is not going to use the
774  * lockspace any longer.  We free everything we're managing for this lockspace.
775  * Remaining nodes will go through the recovery process as if we'd died.  The
776  * lockspace must continue to function as usual, participating in recoveries,
777  * until this returns.
778  *
779  * Force has 4 possible values:
780  * 0 - don't destroy lockspace if it has any LKBs
781  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
782  * 2 - destroy lockspace regardless of LKBs
783  * 3 - destroy lockspace as part of a forced shutdown
784  */
785 
786 int dlm_release_lockspace(void *lockspace, int force)
787 {
788 	struct dlm_ls *ls;
789 	int error;
790 
791 	ls = dlm_find_lockspace_local(lockspace);
792 	if (!ls)
793 		return -EINVAL;
794 	dlm_put_lockspace(ls);
795 
796 	mutex_lock(&ls_lock);
797 	error = release_lockspace(ls, force);
798 	if (!error)
799 		ls_count--;
800 	if (!ls_count)
801 		dlm_midcomms_stop();
802 	mutex_unlock(&ls_lock);
803 
804 	return error;
805 }
806 
807 void dlm_stop_lockspaces(void)
808 {
809 	struct dlm_ls *ls;
810 	int count;
811 
812  restart:
813 	count = 0;
814 	spin_lock_bh(&lslist_lock);
815 	list_for_each_entry(ls, &lslist, ls_list) {
816 		if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
817 			count++;
818 			continue;
819 		}
820 		spin_unlock_bh(&lslist_lock);
821 		log_error(ls, "no userland control daemon, stopping lockspace");
822 		dlm_ls_stop(ls);
823 		goto restart;
824 	}
825 	spin_unlock_bh(&lslist_lock);
826 
827 	if (count)
828 		log_print("dlm user daemon left %d lockspaces", count);
829 }
830