Connection.cxx 13.1 KB
Newer Older
1
/*
2
 * Copyright 2003-2016 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#include "config.h"
#include "Connection.hxx"
#include "Lease.hxx"
#include "Callback.hxx"
24
#include "event/Loop.hxx"
25
#include "system/fd_util.h"
26
#include "util/RuntimeError.hxx"
27 28 29 30 31 32 33

extern "C" {
#include <nfsc/libnfs.h>
}

#include <utility>

34 35
#include <poll.h> /* for POLLIN, POLLOUT */

36 37
static constexpr unsigned NFS_MOUNT_TIMEOUT = 60;

38
inline void
39
NfsConnection::CancellableCallback::Stat(nfs_context *ctx,
40
					 const char *path)
41
{
42 43
	assert(connection.GetEventLoop().IsInside());

44
	int result = nfs_stat_async(ctx, path, Callback, this);
45 46 47
	if (result < 0)
		throw FormatRuntimeError("nfs_stat_async() failed: %s",
					 nfs_get_error(ctx));
48 49
}

50
inline void
51
NfsConnection::CancellableCallback::OpenDirectory(nfs_context *ctx,
52
						  const char *path)
53
{
54 55
	assert(connection.GetEventLoop().IsInside());

56
	int result = nfs_opendir_async(ctx, path, Callback, this);
57 58 59
	if (result < 0)
		throw FormatRuntimeError("nfs_opendir_async() failed: %s",
					 nfs_get_error(ctx));
60 61
}

62
inline void
63
NfsConnection::CancellableCallback::Open(nfs_context *ctx,
64
					 const char *path, int flags)
65
{
66 67
	assert(connection.GetEventLoop().IsInside());

68 69
	int result = nfs_open_async(ctx, path, flags,
				    Callback, this);
70 71 72
	if (result < 0)
		throw FormatRuntimeError("nfs_open_async() failed: %s",
					 nfs_get_error(ctx));
73 74
}

75
inline void
76
NfsConnection::CancellableCallback::Stat(nfs_context *ctx,
77
					 struct nfsfh *fh)
78
{
79 80
	assert(connection.GetEventLoop().IsInside());

81
	int result = nfs_fstat_async(ctx, fh, Callback, this);
82 83 84
	if (result < 0)
		throw FormatRuntimeError("nfs_fstat_async() failed: %s",
					 nfs_get_error(ctx));
85 86
}

87
inline void
88
NfsConnection::CancellableCallback::Read(nfs_context *ctx, struct nfsfh *fh,
89
					 uint64_t offset, size_t size)
90
{
91 92
	assert(connection.GetEventLoop().IsInside());

93
	int result = nfs_pread_async(ctx, fh, offset, size, Callback, this);
94 95 96
	if (result < 0)
		throw FormatRuntimeError("nfs_pread_async() failed: %s",
					 nfs_get_error(ctx));
97 98
}

99 100 101
inline void
NfsConnection::CancellableCallback::CancelAndScheduleClose(struct nfsfh *fh)
{
102
	assert(connection.GetEventLoop().IsInside());
103 104 105 106 107 108 109 110
	assert(!open);
	assert(close_fh == nullptr);
	assert(fh != nullptr);

	close_fh = fh;
	Cancel();
}

111 112 113 114 115 116 117 118 119 120 121
inline void
NfsConnection::CancellableCallback::PrepareDestroyContext()
{
	assert(IsCancelled());

	if (close_fh != nullptr) {
		connection.InternalClose(close_fh);
		close_fh = nullptr;
	}
}

122 123 124
inline void
NfsConnection::CancellableCallback::Callback(int err, void *data)
{
125 126
	assert(connection.GetEventLoop().IsInside());

127
	if (!IsCancelled()) {
128 129
		assert(close_fh == nullptr);

130 131 132 133 134 135 136
		NfsCallback &cb = Get();

		connection.callbacks.Remove(*this);

		if (err >= 0)
			cb.OnNfsCallback((unsigned)err, data);
		else
137
			cb.OnNfsError(std::make_exception_ptr(std::runtime_error((const char *)data)));
138
	} else {
139 140 141 142
		if (open) {
			/* a nfs_open_async() call was cancelled - to
			   avoid a memory leak, close the newly
			   allocated file handle immediately */
143 144
			assert(close_fh == nullptr);

145 146 147 148
			if (err >= 0) {
				struct nfsfh *fh = (struct nfsfh *)data;
				connection.Close(fh);
			}
149 150
		} else if (close_fh != nullptr)
			connection.DeferClose(close_fh);
151

152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
		connection.callbacks.Remove(*this);
	}
}

void
NfsConnection::CancellableCallback::Callback(int err,
					     gcc_unused struct nfs_context *nfs,
					     void *data, void *private_data)
{
	CancellableCallback &c = *(CancellableCallback *)private_data;
	c.Callback(err, data);
}

static constexpr unsigned
libnfs_to_events(int i)
{
	return ((i & POLLIN) ? SocketMonitor::READ : 0) |
		((i & POLLOUT) ? SocketMonitor::WRITE : 0);
}

static constexpr int
events_to_libnfs(unsigned i)
{
	return ((i & SocketMonitor::READ) ? POLLIN : 0) |
		((i & SocketMonitor::WRITE) ? POLLOUT : 0);
}

NfsConnection::~NfsConnection()
{
181
	assert(GetEventLoop().IsInside());
182 183 184
	assert(new_leases.empty());
	assert(active_leases.empty());
	assert(callbacks.IsEmpty());
185
	assert(deferred_close.empty());
186 187

	if (context != nullptr)
188
		DestroyContext();
189 190 191 192 193
}

void
NfsConnection::AddLease(NfsLease &lease)
{
194 195 196
	assert(GetEventLoop().IsInside());

	new_leases.push_back(&lease);
197 198 199 200 201 202 203

	DeferredMonitor::Schedule();
}

void
NfsConnection::RemoveLease(NfsLease &lease)
{
204
	assert(GetEventLoop().IsInside());
205 206 207 208 209

	new_leases.remove(&lease);
	active_leases.remove(&lease);
}

210 211
void
NfsConnection::Stat(const char *path, NfsCallback &callback)
212
{
213
	assert(GetEventLoop().IsInside());
214 215 216
	assert(!callbacks.Contains(callback));

	auto &c = callbacks.Add(callback, *this, false);
217 218 219
	try {
		c.Stat(context, path);
	} catch (...) {
220
		callbacks.Remove(c);
221
		throw;
222 223 224 225 226
	}

	ScheduleSocket();
}

227 228
void
NfsConnection::OpenDirectory(const char *path, NfsCallback &callback)
229
{
230
	assert(GetEventLoop().IsInside());
231 232 233
	assert(!callbacks.Contains(callback));

	auto &c = callbacks.Add(callback, *this, true);
234 235 236
	try {
		c.OpenDirectory(context, path);
	} catch (...) {
237
		callbacks.Remove(c);
238
		throw;
239 240 241 242 243 244 245 246
	}

	ScheduleSocket();
}

const struct nfsdirent *
NfsConnection::ReadDirectory(struct nfsdir *dir)
{
247 248
	assert(GetEventLoop().IsInside());

249 250 251 252 253 254
	return nfs_readdir(context, dir);
}

void
NfsConnection::CloseDirectory(struct nfsdir *dir)
{
255 256
	assert(GetEventLoop().IsInside());

257 258 259
	return nfs_closedir(context, dir);
}

260 261
void
NfsConnection::Open(const char *path, int flags, NfsCallback &callback)
262
{
263
	assert(GetEventLoop().IsInside());
264 265
	assert(!callbacks.Contains(callback));

266
	auto &c = callbacks.Add(callback, *this, true);
267 268 269
	try {
		c.Open(context, path, flags);
	} catch (...) {
270
		callbacks.Remove(c);
271
		throw;
272 273 274 275 276
	}

	ScheduleSocket();
}

277 278
void
NfsConnection::Stat(struct nfsfh *fh, NfsCallback &callback)
279
{
280
	assert(GetEventLoop().IsInside());
281 282
	assert(!callbacks.Contains(callback));

283
	auto &c = callbacks.Add(callback, *this, false);
284 285 286
	try {
		c.Stat(context, fh);
	} catch (...) {
287
		callbacks.Remove(c);
288
		throw;
289 290 291 292 293
	}

	ScheduleSocket();
}

294
void
295
NfsConnection::Read(struct nfsfh *fh, uint64_t offset, size_t size,
296
		    NfsCallback &callback)
297
{
298
	assert(GetEventLoop().IsInside());
299 300
	assert(!callbacks.Contains(callback));

301
	auto &c = callbacks.Add(callback, *this, false);
302 303 304
	try {
		c.Read(context, fh, offset, size);
	} catch (...) {
305
		callbacks.Remove(c);
306
		throw;
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
	}

	ScheduleSocket();
}

void
NfsConnection::Cancel(NfsCallback &callback)
{
	callbacks.Cancel(callback);
}

static void
DummyCallback(int, struct nfs_context *, void *, void *)
{
}

323 324 325 326 327 328 329 330 331 332
inline void
NfsConnection::InternalClose(struct nfsfh *fh)
{
	assert(GetEventLoop().IsInside());
	assert(context != nullptr);
	assert(fh != nullptr);

	nfs_close_async(context, fh, DummyCallback, nullptr);
}

333 334 335
void
NfsConnection::Close(struct nfsfh *fh)
{
336 337
	assert(GetEventLoop().IsInside());

338
	InternalClose(fh);
339 340 341
	ScheduleSocket();
}

342 343 344 345 346 347 348
void
NfsConnection::CancelAndClose(struct nfsfh *fh, NfsCallback &callback)
{
	CancellableCallback &cancel = callbacks.Get(callback);
	cancel.CancelAndScheduleClose(fh);
}

349 350 351
void
NfsConnection::DestroyContext()
{
352
	assert(GetEventLoop().IsInside());
353 354
	assert(context != nullptr);

355 356 357 358 359
#ifndef NDEBUG
	assert(!in_destroy);
	in_destroy = true;
#endif

360 361 362 363 364
	if (!mount_finished) {
		assert(TimeoutMonitor::IsActive());
		TimeoutMonitor::Cancel();
	}

365 366 367 368
	/* cancel pending DeferredMonitor that was scheduled to notify
	   new leases */
	DeferredMonitor::Cancel();

369
	if (SocketMonitor::IsDefined())
370
		SocketMonitor::Steal();
371

372 373 374 375
	callbacks.ForEach([](CancellableCallback &c){
			c.PrepareDestroyContext();
		});

376 377 378 379
	nfs_destroy_context(context);
	context = nullptr;
}

380 381 382
inline void
NfsConnection::DeferClose(struct nfsfh *fh)
{
383
	assert(GetEventLoop().IsInside());
384 385
	assert(in_event);
	assert(in_service);
386 387
	assert(context != nullptr);
	assert(fh != nullptr);
388 389 390 391

	deferred_close.push_front(fh);
}

392 393 394
void
NfsConnection::ScheduleSocket()
{
395
	assert(GetEventLoop().IsInside());
396 397 398 399
	assert(context != nullptr);

	if (!SocketMonitor::IsDefined()) {
		int _fd = nfs_get_fd(context);
400 401 402
		if (_fd < 0)
			return;

403 404 405 406 407 408 409
		fd_set_cloexec(_fd, true);
		SocketMonitor::Open(_fd);
	}

	SocketMonitor::Schedule(libnfs_to_events(nfs_which_events(context)));
}

410 411
inline int
NfsConnection::Service(unsigned flags)
412
{
413
	assert(GetEventLoop().IsInside());
414
	assert(context != nullptr);
415

416
#ifndef NDEBUG
417 418 419 420 421
	assert(!in_event);
	in_event = true;

	assert(!in_service);
	in_service = true;
422
#endif
423 424 425

	int result = nfs_service(context, events_to_libnfs(flags));

426
#ifndef NDEBUG
427 428 429
	assert(context != nullptr);
	assert(in_service);
	in_service = false;
430
#endif
431

432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
	return result;
}

bool
NfsConnection::OnSocketReady(unsigned flags)
{
	assert(GetEventLoop().IsInside());
	assert(deferred_close.empty());

	bool closed = false;

	const bool was_mounted = mount_finished;
	if (!mount_finished)
		/* until the mount is finished, the NFS client may use
		   various sockets, therefore we unregister and
		   re-register it each time */
		SocketMonitor::Steal();

	const int result = Service(flags);

452
	while (!deferred_close.empty()) {
453
		InternalClose(deferred_close.front());
454 455 456
		deferred_close.pop_front();
	}

457
	if (!was_mounted && mount_finished) {
458
		if (postponed_mount_error) {
459 460 461 462 463 464 465 466
			DestroyContext();
			closed = true;
			BroadcastMountError(std::move(postponed_mount_error));
		} else if (result == 0)
			BroadcastMountSuccess();
	} else if (result < 0) {
		/* the connection has failed */

467 468 469
		auto e = FormatRuntimeError("NFS connection has failed: %s",
					    nfs_get_error(context));
		BroadcastError(std::make_exception_ptr(e));
470

471 472
		DestroyContext();
		closed = true;
473
	} else if (nfs_get_fd(context) < 0) {
474
		/* this happens when rpc_reconnect_requeue() is called
475
		   after the connection broke, but autoreconnect was
476
		   disabled - nfs_service() returns 0 */
477

478 479
		const char *msg = nfs_get_error(context);
		if (msg == nullptr)
480 481
			msg = "<unknown>";
		auto e = FormatRuntimeError("NFS socket disappeared: %s", msg);
482

483
		BroadcastError(std::make_exception_ptr(e));
484

485 486
		DestroyContext();
		closed = true;
487 488
	}

489 490
	assert(context == nullptr || nfs_get_fd(context) >= 0);

491
#ifndef NDEBUG
492 493
	assert(in_event);
	in_event = false;
494
#endif
495 496 497 498 499 500 501 502 503 504 505

	if (context != nullptr)
		ScheduleSocket();

	return !closed;
}

inline void
NfsConnection::MountCallback(int status, gcc_unused nfs_context *nfs,
			     gcc_unused void *data)
{
506
	assert(GetEventLoop().IsInside());
507 508 509 510
	assert(context == nfs);

	mount_finished = true;

511 512 513
	assert(TimeoutMonitor::IsActive() || in_destroy);
	TimeoutMonitor::Cancel();

514
	if (status < 0) {
515 516 517
		auto e = FormatRuntimeError("nfs_mount_async() failed: %s",
					    nfs_get_error(context));
		postponed_mount_error = std::make_exception_ptr(e);
518 519 520 521 522 523 524 525 526 527 528 529 530
		return;
	}
}

void
NfsConnection::MountCallback(int status, nfs_context *nfs, void *data,
			     void *private_data)
{
	NfsConnection *c = (NfsConnection *)private_data;

	c->MountCallback(status, nfs, data);
}

531 532
inline void
NfsConnection::MountInternal()
533
{
534
	assert(GetEventLoop().IsInside());
535
	assert(context == nullptr);
536 537

	context = nfs_init_context();
538 539
	if (context == nullptr)
		throw std::runtime_error("nfs_init_context() failed");
540

541
	postponed_mount_error = std::exception_ptr();
542
	mount_finished = false;
543

544 545
	TimeoutMonitor::ScheduleSeconds(NFS_MOUNT_TIMEOUT);

546
#ifndef NDEBUG
547 548
	in_service = false;
	in_event = false;
549
	in_destroy = false;
550
#endif
551 552 553

	if (nfs_mount_async(context, server.c_str(), export_name.c_str(),
			    MountCallback, this) != 0) {
554 555
		auto e = FormatRuntimeError("nfs_mount_async() failed: %s",
					    nfs_get_error(context));
556 557
		nfs_destroy_context(context);
		context = nullptr;
558
		throw e;
559 560 561 562 563 564 565 566
	}

	ScheduleSocket();
}

void
NfsConnection::BroadcastMountSuccess()
{
567 568
	assert(GetEventLoop().IsInside());

569 570 571 572 573 574 575 576
	while (!new_leases.empty()) {
		auto i = new_leases.begin();
		active_leases.splice(active_leases.end(), new_leases, i);
		(*i)->OnNfsConnectionReady();
	}
}

void
577
NfsConnection::BroadcastMountError(std::exception_ptr &&e)
578
{
579 580
	assert(GetEventLoop().IsInside());

581 582 583
	while (!new_leases.empty()) {
		auto l = new_leases.front();
		new_leases.pop_front();
584
		l->OnNfsConnectionFailed(e);
585 586
	}

587
	OnNfsConnectionError(std::move(e));
588 589 590
}

void
591
NfsConnection::BroadcastError(std::exception_ptr &&e)
592
{
593 594
	assert(GetEventLoop().IsInside());

595 596 597
	while (!active_leases.empty()) {
		auto l = active_leases.front();
		active_leases.pop_front();
598
		l->OnNfsConnectionDisconnected(e);
599 600
	}

601
	BroadcastMountError(std::move(e));
602 603
}

604 605 606 607 608 609 610 611 612
void
NfsConnection::OnTimeout()
{
	assert(GetEventLoop().IsInside());
	assert(!mount_finished);

	mount_finished = true;
	DestroyContext();

613
	BroadcastMountError(std::make_exception_ptr(std::runtime_error("Mount timeout")));
614 615
}

616 617 618
void
NfsConnection::RunDeferred()
{
619 620
	assert(GetEventLoop().IsInside());

621
	if (context == nullptr) {
622 623 624 625
		try {
			MountInternal();
		} catch (...) {
			BroadcastMountError(std::current_exception());
626 627 628 629
			return;
		}
	}

630
	if (mount_finished)
631 632
		BroadcastMountSuccess();
}