Connection.cxx 14.8 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2017 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#include "config.h"
#include "Connection.hxx"
22
#include "Error.hxx"
23 24
#include "Lease.hxx"
#include "Callback.hxx"
25
#include "event/Loop.hxx"
26
#include "net/SocketDescriptor.hxx"
27
#include "util/RuntimeError.hxx"
28 29 30 31 32 33 34

extern "C" {
#include <nfsc/libnfs.h>
}

#include <utility>

35 36 37
#ifdef _WIN32
#include <winsock2.h>
#else
38
#include <poll.h> /* for POLLIN, POLLOUT */
39
#endif
40

41 42
static constexpr std::chrono::steady_clock::duration NFS_MOUNT_TIMEOUT =
	std::chrono::minutes(1);
43

44
inline void
45
NfsConnection::CancellableCallback::Stat(nfs_context *ctx,
46
					 const char *path)
47
{
48 49
	assert(connection.GetEventLoop().IsInside());

50
	int result = nfs_stat64_async(ctx, path, Callback, this);
51
	if (result < 0)
52
		throw FormatRuntimeError("nfs_stat64_async() failed: %s",
53
					 nfs_get_error(ctx));
54 55
}

56 57 58 59 60 61 62 63 64 65 66 67
inline void
NfsConnection::CancellableCallback::Lstat(nfs_context *ctx,
					  const char *path)
{
	assert(connection.GetEventLoop().IsInside());

	int result = nfs_lstat64_async(ctx, path, Callback, this);
	if (result < 0)
		throw FormatRuntimeError("nfs_lstat64_async() failed: %s",
					 nfs_get_error(ctx));
}

68
inline void
69
NfsConnection::CancellableCallback::OpenDirectory(nfs_context *ctx,
70
						  const char *path)
71
{
72 73
	assert(connection.GetEventLoop().IsInside());

74
	int result = nfs_opendir_async(ctx, path, Callback, this);
75 76 77
	if (result < 0)
		throw FormatRuntimeError("nfs_opendir_async() failed: %s",
					 nfs_get_error(ctx));
78 79
}

80
inline void
81
NfsConnection::CancellableCallback::Open(nfs_context *ctx,
82
					 const char *path, int flags)
83
{
84 85
	assert(connection.GetEventLoop().IsInside());

86 87
	int result = nfs_open_async(ctx, path, flags,
				    Callback, this);
88 89 90
	if (result < 0)
		throw FormatRuntimeError("nfs_open_async() failed: %s",
					 nfs_get_error(ctx));
91 92
}

93
inline void
94
NfsConnection::CancellableCallback::Stat(nfs_context *ctx,
95
					 struct nfsfh *fh)
96
{
97 98
	assert(connection.GetEventLoop().IsInside());

99
	int result = nfs_fstat_async(ctx, fh, Callback, this);
100 101 102
	if (result < 0)
		throw FormatRuntimeError("nfs_fstat_async() failed: %s",
					 nfs_get_error(ctx));
103 104
}

105
inline void
106
NfsConnection::CancellableCallback::Read(nfs_context *ctx, struct nfsfh *fh,
107
					 uint64_t offset, size_t size)
108
{
109 110
	assert(connection.GetEventLoop().IsInside());

111
	int result = nfs_pread_async(ctx, fh, offset, size, Callback, this);
112 113 114
	if (result < 0)
		throw FormatRuntimeError("nfs_pread_async() failed: %s",
					 nfs_get_error(ctx));
115 116
}

117
inline void
118
NfsConnection::CancellableCallback::CancelAndScheduleClose(struct nfsfh *fh) noexcept
119
{
120
	assert(connection.GetEventLoop().IsInside());
121 122 123 124 125 126 127 128
	assert(!open);
	assert(close_fh == nullptr);
	assert(fh != nullptr);

	close_fh = fh;
	Cancel();
}

129
inline void
130
NfsConnection::CancellableCallback::PrepareDestroyContext() noexcept
131 132 133 134 135 136 137 138 139
{
	assert(IsCancelled());

	if (close_fh != nullptr) {
		connection.InternalClose(close_fh);
		close_fh = nullptr;
	}
}

140
inline void
141
NfsConnection::CancellableCallback::Callback(int err, void *data) noexcept
142
{
143 144
	assert(connection.GetEventLoop().IsInside());

145
	if (!IsCancelled()) {
146 147
		assert(close_fh == nullptr);

148 149 150 151 152 153 154
		NfsCallback &cb = Get();

		connection.callbacks.Remove(*this);

		if (err >= 0)
			cb.OnNfsCallback((unsigned)err, data);
		else
155
			cb.OnNfsError(std::make_exception_ptr(NfsClientError(-err, (const char *)data)));
156
	} else {
157 158 159 160
		if (open) {
			/* a nfs_open_async() call was cancelled - to
			   avoid a memory leak, close the newly
			   allocated file handle immediately */
161 162
			assert(close_fh == nullptr);

163 164 165 166
			if (err >= 0) {
				struct nfsfh *fh = (struct nfsfh *)data;
				connection.Close(fh);
			}
167 168
		} else if (close_fh != nullptr)
			connection.DeferClose(close_fh);
169

170 171 172 173 174 175 176
		connection.callbacks.Remove(*this);
	}
}

void
NfsConnection::CancellableCallback::Callback(int err,
					     gcc_unused struct nfs_context *nfs,
177 178
					     void *data,
					     void *private_data) noexcept
179 180 181 182 183 184
{
	CancellableCallback &c = *(CancellableCallback *)private_data;
	c.Callback(err, data);
}

static constexpr unsigned
185
libnfs_to_events(int i) noexcept
186 187 188 189 190 191
{
	return ((i & POLLIN) ? SocketMonitor::READ : 0) |
		((i & POLLOUT) ? SocketMonitor::WRITE : 0);
}

static constexpr int
192
events_to_libnfs(unsigned i) noexcept
193 194 195 196 197
{
	return ((i & SocketMonitor::READ) ? POLLIN : 0) |
		((i & SocketMonitor::WRITE) ? POLLOUT : 0);
}

198
NfsConnection::~NfsConnection() noexcept
199
{
200
	assert(GetEventLoop().IsInside());
201 202 203
	assert(new_leases.empty());
	assert(active_leases.empty());
	assert(callbacks.IsEmpty());
204
	assert(deferred_close.empty());
205 206

	if (context != nullptr)
207
		DestroyContext();
208 209 210
}

void
211
NfsConnection::AddLease(NfsLease &lease) noexcept
212
{
213 214 215
	assert(GetEventLoop().IsInside());

	new_leases.push_back(&lease);
216

217
	defer_new_lease.Schedule();
218 219 220
}

void
221
NfsConnection::RemoveLease(NfsLease &lease) noexcept
222
{
223
	assert(GetEventLoop().IsInside());
224 225 226 227 228

	new_leases.remove(&lease);
	active_leases.remove(&lease);
}

229 230
void
NfsConnection::Stat(const char *path, NfsCallback &callback)
231
{
232
	assert(GetEventLoop().IsInside());
233 234 235
	assert(!callbacks.Contains(callback));

	auto &c = callbacks.Add(callback, *this, false);
236 237 238
	try {
		c.Stat(context, path);
	} catch (...) {
239
		callbacks.Remove(c);
240
		throw;
241 242 243 244 245
	}

	ScheduleSocket();
}

246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
void
NfsConnection::Lstat(const char *path, NfsCallback &callback)
{
	assert(GetEventLoop().IsInside());
	assert(!callbacks.Contains(callback));

	auto &c = callbacks.Add(callback, *this, false);
	try {
		c.Lstat(context, path);
	} catch (...) {
		callbacks.Remove(c);
		throw;
	}

	ScheduleSocket();
}

263 264
void
NfsConnection::OpenDirectory(const char *path, NfsCallback &callback)
265
{
266
	assert(GetEventLoop().IsInside());
267 268 269
	assert(!callbacks.Contains(callback));

	auto &c = callbacks.Add(callback, *this, true);
270 271 272
	try {
		c.OpenDirectory(context, path);
	} catch (...) {
273
		callbacks.Remove(c);
274
		throw;
275 276 277 278 279 280
	}

	ScheduleSocket();
}

const struct nfsdirent *
281
NfsConnection::ReadDirectory(struct nfsdir *dir) noexcept
282
{
283 284
	assert(GetEventLoop().IsInside());

285 286 287 288
	return nfs_readdir(context, dir);
}

void
289
NfsConnection::CloseDirectory(struct nfsdir *dir) noexcept
290
{
291 292
	assert(GetEventLoop().IsInside());

293 294 295
	return nfs_closedir(context, dir);
}

296 297
void
NfsConnection::Open(const char *path, int flags, NfsCallback &callback)
298
{
299
	assert(GetEventLoop().IsInside());
300 301
	assert(!callbacks.Contains(callback));

302
	auto &c = callbacks.Add(callback, *this, true);
303 304 305
	try {
		c.Open(context, path, flags);
	} catch (...) {
306
		callbacks.Remove(c);
307
		throw;
308 309 310 311 312
	}

	ScheduleSocket();
}

313 314
void
NfsConnection::Stat(struct nfsfh *fh, NfsCallback &callback)
315
{
316
	assert(GetEventLoop().IsInside());
317 318
	assert(!callbacks.Contains(callback));

319
	auto &c = callbacks.Add(callback, *this, false);
320 321 322
	try {
		c.Stat(context, fh);
	} catch (...) {
323
		callbacks.Remove(c);
324
		throw;
325 326 327 328 329
	}

	ScheduleSocket();
}

330
void
331
NfsConnection::Read(struct nfsfh *fh, uint64_t offset, size_t size,
332
		    NfsCallback &callback)
333
{
334
	assert(GetEventLoop().IsInside());
335 336
	assert(!callbacks.Contains(callback));

337
	auto &c = callbacks.Add(callback, *this, false);
338 339 340
	try {
		c.Read(context, fh, offset, size);
	} catch (...) {
341
		callbacks.Remove(c);
342
		throw;
343 344 345 346 347 348
	}

	ScheduleSocket();
}

void
349
NfsConnection::Cancel(NfsCallback &callback) noexcept
350 351 352 353 354
{
	callbacks.Cancel(callback);
}

static void
355
DummyCallback(int, struct nfs_context *, void *, void *) noexcept
356 357 358
{
}

359
inline void
360
NfsConnection::InternalClose(struct nfsfh *fh) noexcept
361 362 363 364 365 366 367 368
{
	assert(GetEventLoop().IsInside());
	assert(context != nullptr);
	assert(fh != nullptr);

	nfs_close_async(context, fh, DummyCallback, nullptr);
}

369
void
370
NfsConnection::Close(struct nfsfh *fh) noexcept
371
{
372 373
	assert(GetEventLoop().IsInside());

374
	InternalClose(fh);
375 376 377
	ScheduleSocket();
}

378
void
379
NfsConnection::CancelAndClose(struct nfsfh *fh, NfsCallback &callback) noexcept
380 381 382 383 384
{
	CancellableCallback &cancel = callbacks.Get(callback);
	cancel.CancelAndScheduleClose(fh);
}

385
void
386
NfsConnection::DestroyContext() noexcept
387
{
388
	assert(GetEventLoop().IsInside());
389 390
	assert(context != nullptr);

391 392 393 394 395
#ifndef NDEBUG
	assert(!in_destroy);
	in_destroy = true;
#endif

396
	if (!mount_finished) {
397 398
		assert(mount_timeout_event.IsActive());
		mount_timeout_event.Cancel();
399 400
	}

401
	/* cancel pending DeferEvent that was scheduled to notify
402
	   new leases */
403
	defer_new_lease.Cancel();
404

405
	if (SocketMonitor::IsDefined())
406
		SocketMonitor::Steal();
407

408 409 410 411
	callbacks.ForEach([](CancellableCallback &c){
			c.PrepareDestroyContext();
		});

412 413 414 415
	nfs_destroy_context(context);
	context = nullptr;
}

416
inline void
417
NfsConnection::DeferClose(struct nfsfh *fh) noexcept
418
{
419
	assert(GetEventLoop().IsInside());
420 421
	assert(in_event);
	assert(in_service);
422 423
	assert(context != nullptr);
	assert(fh != nullptr);
424 425 426 427

	deferred_close.push_front(fh);
}

428
void
429
NfsConnection::ScheduleSocket() noexcept
430
{
431
	assert(GetEventLoop().IsInside());
432 433
	assert(context != nullptr);

434 435 436 437 438 439 440 441 442 443 444
	const int which_events = nfs_which_events(context);

	if (which_events == POLLOUT && SocketMonitor::IsDefined())
		/* kludge: if libnfs asks only for POLLOUT, it means
		   that it is currently waiting for the connect() to
		   finish - rpc_reconnect_requeue() may have been
		   called from inside nfs_service(); we must now
		   unregister the old socket and register the new one
		   instead */
		SocketMonitor::Steal();

445
	if (!SocketMonitor::IsDefined()) {
446 447
		SocketDescriptor _fd(nfs_get_fd(context));
		if (!_fd.IsDefined())
448 449
			return;

450
		_fd.EnableCloseOnExec();
451
		SocketMonitor::Open(_fd);
452 453
	}

454 455
	SocketMonitor::Schedule(libnfs_to_events(which_events)
				| SocketMonitor::HANGUP);
456 457
}

458
inline int
459
NfsConnection::Service(unsigned flags) noexcept
460
{
461
	assert(GetEventLoop().IsInside());
462
	assert(context != nullptr);
463

464
#ifndef NDEBUG
465 466 467 468 469
	assert(!in_event);
	in_event = true;

	assert(!in_service);
	in_service = true;
470
#endif
471 472 473

	int result = nfs_service(context, events_to_libnfs(flags));

474
#ifndef NDEBUG
475 476 477
	assert(context != nullptr);
	assert(in_service);
	in_service = false;
478
#endif
479

480 481 482 483
	return result;
}

bool
484
NfsConnection::OnSocketReady(unsigned flags) noexcept
485 486 487 488 489 490 491
{
	assert(GetEventLoop().IsInside());
	assert(deferred_close.empty());

	bool closed = false;

	const bool was_mounted = mount_finished;
492
	if (!mount_finished || (flags & SocketMonitor::HANGUP) != 0)
493 494 495
		/* until the mount is finished, the NFS client may use
		   various sockets, therefore we unregister and
		   re-register it each time */
496 497 498 499
		/* also re-register the socket if we got a HANGUP,
		   which is a sure sign that libnfs will close the
		   socket, which can lead to a race condition if
		   epoll_ctl() is called later */
500 501 502 503
		SocketMonitor::Steal();

	const int result = Service(flags);

504
	while (!deferred_close.empty()) {
505
		InternalClose(deferred_close.front());
506 507 508
		deferred_close.pop_front();
	}

509
	if (!was_mounted && mount_finished) {
510
		if (postponed_mount_error) {
511 512 513 514 515 516 517 518
			DestroyContext();
			closed = true;
			BroadcastMountError(std::move(postponed_mount_error));
		} else if (result == 0)
			BroadcastMountSuccess();
	} else if (result < 0) {
		/* the connection has failed */

519 520 521
		auto e = FormatRuntimeError("NFS connection has failed: %s",
					    nfs_get_error(context));
		BroadcastError(std::make_exception_ptr(e));
522

523 524
		DestroyContext();
		closed = true;
525
	} else if (nfs_get_fd(context) < 0) {
526
		/* this happens when rpc_reconnect_requeue() is called
527
		   after the connection broke, but autoreconnect was
528
		   disabled - nfs_service() returns 0 */
529

530 531
		const char *msg = nfs_get_error(context);
		if (msg == nullptr)
532 533
			msg = "<unknown>";
		auto e = FormatRuntimeError("NFS socket disappeared: %s", msg);
534

535
		BroadcastError(std::make_exception_ptr(e));
536

537 538
		DestroyContext();
		closed = true;
539 540
	}

541 542
	assert(context == nullptr || nfs_get_fd(context) >= 0);

543
#ifndef NDEBUG
544 545
	assert(in_event);
	in_event = false;
546
#endif
547 548 549 550 551 552 553 554 555

	if (context != nullptr)
		ScheduleSocket();

	return !closed;
}

inline void
NfsConnection::MountCallback(int status, gcc_unused nfs_context *nfs,
556
			     gcc_unused void *data) noexcept
557
{
558
	assert(GetEventLoop().IsInside());
559 560 561 562
	assert(context == nfs);

	mount_finished = true;

563 564
	assert(mount_timeout_event.IsActive() || in_destroy);
	mount_timeout_event.Cancel();
565

566
	if (status < 0) {
567 568 569
		auto e = FormatRuntimeError("nfs_mount_async() failed: %s",
					    nfs_get_error(context));
		postponed_mount_error = std::make_exception_ptr(e);
570 571 572 573 574 575
		return;
	}
}

void
NfsConnection::MountCallback(int status, nfs_context *nfs, void *data,
576
			     void *private_data) noexcept
577 578 579 580 581 582
{
	NfsConnection *c = (NfsConnection *)private_data;

	c->MountCallback(status, nfs, data);
}

583 584
inline void
NfsConnection::MountInternal()
585
{
586
	assert(GetEventLoop().IsInside());
587
	assert(context == nullptr);
588 589

	context = nfs_init_context();
590 591
	if (context == nullptr)
		throw std::runtime_error("nfs_init_context() failed");
592

593
	postponed_mount_error = std::exception_ptr();
594
	mount_finished = false;
595

596
	mount_timeout_event.Schedule(NFS_MOUNT_TIMEOUT);
597

598
#ifndef NDEBUG
599 600
	in_service = false;
	in_event = false;
601
	in_destroy = false;
602
#endif
603 604 605

	if (nfs_mount_async(context, server.c_str(), export_name.c_str(),
			    MountCallback, this) != 0) {
606 607
		auto e = FormatRuntimeError("nfs_mount_async() failed: %s",
					    nfs_get_error(context));
608 609
		nfs_destroy_context(context);
		context = nullptr;
610
		throw e;
611 612 613 614 615 616
	}

	ScheduleSocket();
}

void
617
NfsConnection::BroadcastMountSuccess() noexcept
618
{
619 620
	assert(GetEventLoop().IsInside());

621 622 623 624 625 626 627 628
	while (!new_leases.empty()) {
		auto i = new_leases.begin();
		active_leases.splice(active_leases.end(), new_leases, i);
		(*i)->OnNfsConnectionReady();
	}
}

void
629
NfsConnection::BroadcastMountError(std::exception_ptr &&e) noexcept
630
{
631 632
	assert(GetEventLoop().IsInside());

633 634 635
	while (!new_leases.empty()) {
		auto l = new_leases.front();
		new_leases.pop_front();
636
		l->OnNfsConnectionFailed(e);
637 638
	}

639
	OnNfsConnectionError(std::move(e));
640 641 642
}

void
643
NfsConnection::BroadcastError(std::exception_ptr &&e) noexcept
644
{
645 646
	assert(GetEventLoop().IsInside());

647 648 649
	while (!active_leases.empty()) {
		auto l = active_leases.front();
		active_leases.pop_front();
650
		l->OnNfsConnectionDisconnected(e);
651 652
	}

653
	BroadcastMountError(std::move(e));
654 655
}

656
void
657
NfsConnection::OnMountTimeout() noexcept
658 659 660 661 662 663 664
{
	assert(GetEventLoop().IsInside());
	assert(!mount_finished);

	mount_finished = true;
	DestroyContext();

665
	BroadcastMountError(std::make_exception_ptr(std::runtime_error("Mount timeout")));
666 667
}

668
void
669
NfsConnection::RunDeferred() noexcept
670
{
671 672
	assert(GetEventLoop().IsInside());

673
	if (context == nullptr) {
674 675 676 677
		try {
			MountInternal();
		} catch (...) {
			BroadcastMountError(std::current_exception());
678 679 680 681
			return;
		}
	}

682
	if (mount_finished)
683 684
		BroadcastMountSuccess();
}