Connection.cxx 14.8 KB
Newer Older
1
/*
Max Kellermann's avatar
Max Kellermann committed
2
 * Copyright 2003-2020 The Music Player Daemon Project
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
 * http://www.musicpd.org
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#include "Connection.hxx"
21
#include "Error.hxx"
22 23
#include "Lease.hxx"
#include "Callback.hxx"
24
#include "event/Loop.hxx"
25
#include "net/SocketDescriptor.hxx"
26
#include "util/RuntimeError.hxx"
27 28 29 30 31 32 33

extern "C" {
#include <nfsc/libnfs.h>
}

#include <utility>

34 35 36
#ifdef _WIN32
#include <winsock2.h>
#else
37
#include <poll.h> /* for POLLIN, POLLOUT */
38
#endif
39

40 41
static constexpr std::chrono::steady_clock::duration NFS_MOUNT_TIMEOUT =
	std::chrono::minutes(1);
42

43
inline void
44
NfsConnection::CancellableCallback::Stat(nfs_context *ctx,
45
					 const char *path)
46
{
47 48
	assert(connection.GetEventLoop().IsInside());

49
	int result = nfs_stat64_async(ctx, path, Callback, this);
50
	if (result < 0)
51
		throw FormatRuntimeError("nfs_stat64_async() failed: %s",
52
					 nfs_get_error(ctx));
53 54
}

55 56 57 58 59 60 61 62 63 64 65 66
inline void
NfsConnection::CancellableCallback::Lstat(nfs_context *ctx,
					  const char *path)
{
	assert(connection.GetEventLoop().IsInside());

	int result = nfs_lstat64_async(ctx, path, Callback, this);
	if (result < 0)
		throw FormatRuntimeError("nfs_lstat64_async() failed: %s",
					 nfs_get_error(ctx));
}

67
inline void
68
NfsConnection::CancellableCallback::OpenDirectory(nfs_context *ctx,
69
						  const char *path)
70
{
71 72
	assert(connection.GetEventLoop().IsInside());

73
	int result = nfs_opendir_async(ctx, path, Callback, this);
74 75 76
	if (result < 0)
		throw FormatRuntimeError("nfs_opendir_async() failed: %s",
					 nfs_get_error(ctx));
77 78
}

79
inline void
80
NfsConnection::CancellableCallback::Open(nfs_context *ctx,
81
					 const char *path, int flags)
82
{
83 84
	assert(connection.GetEventLoop().IsInside());

85 86
	int result = nfs_open_async(ctx, path, flags,
				    Callback, this);
87 88 89
	if (result < 0)
		throw FormatRuntimeError("nfs_open_async() failed: %s",
					 nfs_get_error(ctx));
90 91
}

92
inline void
93
NfsConnection::CancellableCallback::Stat(nfs_context *ctx,
94
					 struct nfsfh *fh)
95
{
96 97
	assert(connection.GetEventLoop().IsInside());

98
	int result = nfs_fstat_async(ctx, fh, Callback, this);
99 100 101
	if (result < 0)
		throw FormatRuntimeError("nfs_fstat_async() failed: %s",
					 nfs_get_error(ctx));
102 103
}

104
inline void
105
NfsConnection::CancellableCallback::Read(nfs_context *ctx, struct nfsfh *fh,
106
					 uint64_t offset, size_t size)
107
{
108 109
	assert(connection.GetEventLoop().IsInside());

110
	int result = nfs_pread_async(ctx, fh, offset, size, Callback, this);
111 112 113
	if (result < 0)
		throw FormatRuntimeError("nfs_pread_async() failed: %s",
					 nfs_get_error(ctx));
114 115
}

116
inline void
117
NfsConnection::CancellableCallback::CancelAndScheduleClose(struct nfsfh *fh) noexcept
118
{
119
	assert(connection.GetEventLoop().IsInside());
120 121 122 123 124 125 126 127
	assert(!open);
	assert(close_fh == nullptr);
	assert(fh != nullptr);

	close_fh = fh;
	Cancel();
}

128
inline void
129
NfsConnection::CancellableCallback::PrepareDestroyContext() noexcept
130 131 132 133 134 135 136 137 138
{
	assert(IsCancelled());

	if (close_fh != nullptr) {
		connection.InternalClose(close_fh);
		close_fh = nullptr;
	}
}

139
inline void
140
NfsConnection::CancellableCallback::Callback(int err, void *data) noexcept
141
{
142 143
	assert(connection.GetEventLoop().IsInside());

144
	if (!IsCancelled()) {
145 146
		assert(close_fh == nullptr);

147 148 149 150 151 152 153
		NfsCallback &cb = Get();

		connection.callbacks.Remove(*this);

		if (err >= 0)
			cb.OnNfsCallback((unsigned)err, data);
		else
154
			cb.OnNfsError(std::make_exception_ptr(NfsClientError(-err, (const char *)data)));
155
	} else {
156 157 158 159
		if (open) {
			/* a nfs_open_async() call was cancelled - to
			   avoid a memory leak, close the newly
			   allocated file handle immediately */
160 161
			assert(close_fh == nullptr);

162 163 164 165
			if (err >= 0) {
				struct nfsfh *fh = (struct nfsfh *)data;
				connection.Close(fh);
			}
166 167
		} else if (close_fh != nullptr)
			connection.DeferClose(close_fh);
168

169 170 171 172 173 174 175
		connection.callbacks.Remove(*this);
	}
}

void
NfsConnection::CancellableCallback::Callback(int err,
					     gcc_unused struct nfs_context *nfs,
176 177
					     void *data,
					     void *private_data) noexcept
178 179 180 181 182 183
{
	CancellableCallback &c = *(CancellableCallback *)private_data;
	c.Callback(err, data);
}

static constexpr unsigned
184
libnfs_to_events(int i) noexcept
185 186 187 188 189 190
{
	return ((i & POLLIN) ? SocketMonitor::READ : 0) |
		((i & POLLOUT) ? SocketMonitor::WRITE : 0);
}

static constexpr int
191
events_to_libnfs(unsigned i) noexcept
192 193 194 195 196
{
	return ((i & SocketMonitor::READ) ? POLLIN : 0) |
		((i & SocketMonitor::WRITE) ? POLLOUT : 0);
}

197
NfsConnection::~NfsConnection() noexcept
198
{
199
	assert(GetEventLoop().IsInside());
200 201 202
	assert(new_leases.empty());
	assert(active_leases.empty());
	assert(callbacks.IsEmpty());
203
	assert(deferred_close.empty());
204 205

	if (context != nullptr)
206
		DestroyContext();
207 208 209
}

void
210
NfsConnection::AddLease(NfsLease &lease) noexcept
211
{
212 213 214
	assert(GetEventLoop().IsInside());

	new_leases.push_back(&lease);
215

216
	defer_new_lease.Schedule();
217 218 219
}

void
220
NfsConnection::RemoveLease(NfsLease &lease) noexcept
221
{
222
	assert(GetEventLoop().IsInside());
223 224 225 226 227

	new_leases.remove(&lease);
	active_leases.remove(&lease);
}

228 229
void
NfsConnection::Stat(const char *path, NfsCallback &callback)
230
{
231
	assert(GetEventLoop().IsInside());
232 233 234
	assert(!callbacks.Contains(callback));

	auto &c = callbacks.Add(callback, *this, false);
235 236 237
	try {
		c.Stat(context, path);
	} catch (...) {
238
		callbacks.Remove(c);
239
		throw;
240 241 242 243 244
	}

	ScheduleSocket();
}

245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
void
NfsConnection::Lstat(const char *path, NfsCallback &callback)
{
	assert(GetEventLoop().IsInside());
	assert(!callbacks.Contains(callback));

	auto &c = callbacks.Add(callback, *this, false);
	try {
		c.Lstat(context, path);
	} catch (...) {
		callbacks.Remove(c);
		throw;
	}

	ScheduleSocket();
}

262 263
void
NfsConnection::OpenDirectory(const char *path, NfsCallback &callback)
264
{
265
	assert(GetEventLoop().IsInside());
266 267 268
	assert(!callbacks.Contains(callback));

	auto &c = callbacks.Add(callback, *this, true);
269 270 271
	try {
		c.OpenDirectory(context, path);
	} catch (...) {
272
		callbacks.Remove(c);
273
		throw;
274 275 276 277 278 279
	}

	ScheduleSocket();
}

const struct nfsdirent *
280
NfsConnection::ReadDirectory(struct nfsdir *dir) noexcept
281
{
282 283
	assert(GetEventLoop().IsInside());

284 285 286 287
	return nfs_readdir(context, dir);
}

void
288
NfsConnection::CloseDirectory(struct nfsdir *dir) noexcept
289
{
290 291
	assert(GetEventLoop().IsInside());

292 293 294
	return nfs_closedir(context, dir);
}

295 296
void
NfsConnection::Open(const char *path, int flags, NfsCallback &callback)
297
{
298
	assert(GetEventLoop().IsInside());
299 300
	assert(!callbacks.Contains(callback));

301
	auto &c = callbacks.Add(callback, *this, true);
302 303 304
	try {
		c.Open(context, path, flags);
	} catch (...) {
305
		callbacks.Remove(c);
306
		throw;
307 308 309 310 311
	}

	ScheduleSocket();
}

312 313
void
NfsConnection::Stat(struct nfsfh *fh, NfsCallback &callback)
314
{
315
	assert(GetEventLoop().IsInside());
316 317
	assert(!callbacks.Contains(callback));

318
	auto &c = callbacks.Add(callback, *this, false);
319 320 321
	try {
		c.Stat(context, fh);
	} catch (...) {
322
		callbacks.Remove(c);
323
		throw;
324 325 326 327 328
	}

	ScheduleSocket();
}

329
void
330
NfsConnection::Read(struct nfsfh *fh, uint64_t offset, size_t size,
331
		    NfsCallback &callback)
332
{
333
	assert(GetEventLoop().IsInside());
334 335
	assert(!callbacks.Contains(callback));

336
	auto &c = callbacks.Add(callback, *this, false);
337 338 339
	try {
		c.Read(context, fh, offset, size);
	} catch (...) {
340
		callbacks.Remove(c);
341
		throw;
342 343 344 345 346 347
	}

	ScheduleSocket();
}

void
348
NfsConnection::Cancel(NfsCallback &callback) noexcept
349 350 351 352 353
{
	callbacks.Cancel(callback);
}

static void
354
DummyCallback(int, struct nfs_context *, void *, void *) noexcept
355 356 357
{
}

358
inline void
359
NfsConnection::InternalClose(struct nfsfh *fh) noexcept
360 361 362 363 364 365 366 367
{
	assert(GetEventLoop().IsInside());
	assert(context != nullptr);
	assert(fh != nullptr);

	nfs_close_async(context, fh, DummyCallback, nullptr);
}

368
void
369
NfsConnection::Close(struct nfsfh *fh) noexcept
370
{
371 372
	assert(GetEventLoop().IsInside());

373
	InternalClose(fh);
374 375 376
	ScheduleSocket();
}

377
void
378
NfsConnection::CancelAndClose(struct nfsfh *fh, NfsCallback &callback) noexcept
379 380 381 382 383
{
	CancellableCallback &cancel = callbacks.Get(callback);
	cancel.CancelAndScheduleClose(fh);
}

384
void
385
NfsConnection::DestroyContext() noexcept
386
{
387
	assert(GetEventLoop().IsInside());
388 389
	assert(context != nullptr);

390 391 392 393 394
#ifndef NDEBUG
	assert(!in_destroy);
	in_destroy = true;
#endif

395
	if (!mount_finished) {
396 397
		assert(mount_timeout_event.IsActive());
		mount_timeout_event.Cancel();
398 399
	}

400
	/* cancel pending DeferEvent that was scheduled to notify
401
	   new leases */
402
	defer_new_lease.Cancel();
403

404
	if (SocketMonitor::IsDefined())
405
		SocketMonitor::Steal();
406

407 408 409 410
	callbacks.ForEach([](CancellableCallback &c){
			c.PrepareDestroyContext();
		});

411 412 413 414
	nfs_destroy_context(context);
	context = nullptr;
}

415
inline void
416
NfsConnection::DeferClose(struct nfsfh *fh) noexcept
417
{
418
	assert(GetEventLoop().IsInside());
419 420
	assert(in_event);
	assert(in_service);
421 422
	assert(context != nullptr);
	assert(fh != nullptr);
423 424 425 426

	deferred_close.push_front(fh);
}

427
void
428
NfsConnection::ScheduleSocket() noexcept
429
{
430
	assert(GetEventLoop().IsInside());
431 432
	assert(context != nullptr);

433 434 435 436 437 438 439 440 441 442 443
	const int which_events = nfs_which_events(context);

	if (which_events == POLLOUT && SocketMonitor::IsDefined())
		/* kludge: if libnfs asks only for POLLOUT, it means
		   that it is currently waiting for the connect() to
		   finish - rpc_reconnect_requeue() may have been
		   called from inside nfs_service(); we must now
		   unregister the old socket and register the new one
		   instead */
		SocketMonitor::Steal();

444
	if (!SocketMonitor::IsDefined()) {
445 446
		SocketDescriptor _fd(nfs_get_fd(context));
		if (!_fd.IsDefined())
447 448
			return;

449
		_fd.EnableCloseOnExec();
450
		SocketMonitor::Open(_fd);
451 452
	}

453 454
	SocketMonitor::Schedule(libnfs_to_events(which_events)
				| SocketMonitor::HANGUP);
455 456
}

457
inline int
458
NfsConnection::Service(unsigned flags) noexcept
459
{
460
	assert(GetEventLoop().IsInside());
461
	assert(context != nullptr);
462

463
#ifndef NDEBUG
464 465 466 467 468
	assert(!in_event);
	in_event = true;

	assert(!in_service);
	in_service = true;
469
#endif
470 471 472

	int result = nfs_service(context, events_to_libnfs(flags));

473
#ifndef NDEBUG
474 475 476
	assert(context != nullptr);
	assert(in_service);
	in_service = false;
477
#endif
478

479 480 481 482
	return result;
}

bool
483
NfsConnection::OnSocketReady(unsigned flags) noexcept
484 485 486 487 488 489 490
{
	assert(GetEventLoop().IsInside());
	assert(deferred_close.empty());

	bool closed = false;

	const bool was_mounted = mount_finished;
491
	if (!mount_finished || (flags & SocketMonitor::HANGUP) != 0)
492 493 494
		/* until the mount is finished, the NFS client may use
		   various sockets, therefore we unregister and
		   re-register it each time */
495 496 497 498
		/* also re-register the socket if we got a HANGUP,
		   which is a sure sign that libnfs will close the
		   socket, which can lead to a race condition if
		   epoll_ctl() is called later */
499 500 501 502
		SocketMonitor::Steal();

	const int result = Service(flags);

503
	while (!deferred_close.empty()) {
504
		InternalClose(deferred_close.front());
505 506 507
		deferred_close.pop_front();
	}

508
	if (!was_mounted && mount_finished) {
509
		if (postponed_mount_error) {
510 511 512 513 514 515 516 517
			DestroyContext();
			closed = true;
			BroadcastMountError(std::move(postponed_mount_error));
		} else if (result == 0)
			BroadcastMountSuccess();
	} else if (result < 0) {
		/* the connection has failed */

518 519 520
		auto e = FormatRuntimeError("NFS connection has failed: %s",
					    nfs_get_error(context));
		BroadcastError(std::make_exception_ptr(e));
521

522 523
		DestroyContext();
		closed = true;
524
	} else if (nfs_get_fd(context) < 0) {
525
		/* this happens when rpc_reconnect_requeue() is called
526
		   after the connection broke, but autoreconnect was
527
		   disabled - nfs_service() returns 0 */
528

529 530
		const char *msg = nfs_get_error(context);
		if (msg == nullptr)
531 532
			msg = "<unknown>";
		auto e = FormatRuntimeError("NFS socket disappeared: %s", msg);
533

534
		BroadcastError(std::make_exception_ptr(e));
535

536 537
		DestroyContext();
		closed = true;
538 539
	}

540 541
	assert(context == nullptr || nfs_get_fd(context) >= 0);

542
#ifndef NDEBUG
543 544
	assert(in_event);
	in_event = false;
545
#endif
546 547 548 549 550 551 552 553 554

	if (context != nullptr)
		ScheduleSocket();

	return !closed;
}

inline void
NfsConnection::MountCallback(int status, gcc_unused nfs_context *nfs,
555
			     gcc_unused void *data) noexcept
556
{
557
	assert(GetEventLoop().IsInside());
558 559 560 561
	assert(context == nfs);

	mount_finished = true;

562 563
	assert(mount_timeout_event.IsActive() || in_destroy);
	mount_timeout_event.Cancel();
564

565
	if (status < 0) {
566 567 568
		auto e = FormatRuntimeError("nfs_mount_async() failed: %s",
					    nfs_get_error(context));
		postponed_mount_error = std::make_exception_ptr(e);
569 570 571 572 573 574
		return;
	}
}

void
NfsConnection::MountCallback(int status, nfs_context *nfs, void *data,
575
			     void *private_data) noexcept
576 577 578 579 580 581
{
	NfsConnection *c = (NfsConnection *)private_data;

	c->MountCallback(status, nfs, data);
}

582 583
inline void
NfsConnection::MountInternal()
584
{
585
	assert(GetEventLoop().IsInside());
586
	assert(context == nullptr);
587 588

	context = nfs_init_context();
589 590
	if (context == nullptr)
		throw std::runtime_error("nfs_init_context() failed");
591

592
	postponed_mount_error = std::exception_ptr();
593
	mount_finished = false;
594

595
	mount_timeout_event.Schedule(NFS_MOUNT_TIMEOUT);
596

597
#ifndef NDEBUG
598 599
	in_service = false;
	in_event = false;
600
	in_destroy = false;
601
#endif
602 603 604

	if (nfs_mount_async(context, server.c_str(), export_name.c_str(),
			    MountCallback, this) != 0) {
605 606
		auto e = FormatRuntimeError("nfs_mount_async() failed: %s",
					    nfs_get_error(context));
607 608
		nfs_destroy_context(context);
		context = nullptr;
609
		throw e;
610 611 612 613 614 615
	}

	ScheduleSocket();
}

void
616
NfsConnection::BroadcastMountSuccess() noexcept
617
{
618 619
	assert(GetEventLoop().IsInside());

620 621 622 623 624 625 626 627
	while (!new_leases.empty()) {
		auto i = new_leases.begin();
		active_leases.splice(active_leases.end(), new_leases, i);
		(*i)->OnNfsConnectionReady();
	}
}

void
628
NfsConnection::BroadcastMountError(std::exception_ptr &&e) noexcept
629
{
630 631
	assert(GetEventLoop().IsInside());

632 633 634
	while (!new_leases.empty()) {
		auto l = new_leases.front();
		new_leases.pop_front();
635
		l->OnNfsConnectionFailed(e);
636 637
	}

638
	OnNfsConnectionError(std::move(e));
639 640 641
}

void
642
NfsConnection::BroadcastError(std::exception_ptr &&e) noexcept
643
{
644 645
	assert(GetEventLoop().IsInside());

646 647 648
	while (!active_leases.empty()) {
		auto l = active_leases.front();
		active_leases.pop_front();
649
		l->OnNfsConnectionDisconnected(e);
650 651
	}

652
	BroadcastMountError(std::move(e));
653 654
}

655
void
656
NfsConnection::OnMountTimeout() noexcept
657 658 659 660 661 662 663
{
	assert(GetEventLoop().IsInside());
	assert(!mount_finished);

	mount_finished = true;
	DestroyContext();

664
	BroadcastMountError(std::make_exception_ptr(std::runtime_error("Mount timeout")));
665 666
}

667
void
668
NfsConnection::RunDeferred() noexcept
669
{
670 671
	assert(GetEventLoop().IsInside());

672
	if (context == nullptr) {
673 674 675 676
		try {
			MountInternal();
		} catch (...) {
			BroadcastMountError(std::current_exception());
677 678 679 680
			return;
		}
	}

681
	if (mount_finished)
682 683
		BroadcastMountSuccess();
}