file.c 45.7 KB
Newer Older
1
// SPDX-License-Identifier: GPL-2.0
2
#include <linux/ceph/ceph_debug.h>
Sage Weil's avatar
Sage Weil committed
3

4
#include <linux/module.h>
Sage Weil's avatar
Sage Weil committed
5
#include <linux/sched.h>
6
#include <linux/slab.h>
Sage Weil's avatar
Sage Weil committed
7
#include <linux/file.h>
Sage Weil's avatar
Sage Weil committed
8
#include <linux/mount.h>
Sage Weil's avatar
Sage Weil committed
9
10
#include <linux/namei.h>
#include <linux/writeback.h>
Li Wang's avatar
Li Wang committed
11
#include <linux/falloc.h>
Sage Weil's avatar
Sage Weil committed
12
13
14

#include "super.h"
#include "mds_client.h"
15
#include "cache.h"
Sage Weil's avatar
Sage Weil committed
16

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static __le32 ceph_flags_sys2wire(u32 flags)
{
	u32 wire_flags = 0;

	switch (flags & O_ACCMODE) {
	case O_RDONLY:
		wire_flags |= CEPH_O_RDONLY;
		break;
	case O_WRONLY:
		wire_flags |= CEPH_O_WRONLY;
		break;
	case O_RDWR:
		wire_flags |= CEPH_O_RDWR;
		break;
	}

33
34
	flags &= ~O_ACCMODE;

35
36
37
38
39
40
41
42
43
44
45
#define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; }

	ceph_sys2wire(O_CREAT);
	ceph_sys2wire(O_EXCL);
	ceph_sys2wire(O_TRUNC);
	ceph_sys2wire(O_DIRECTORY);
	ceph_sys2wire(O_NOFOLLOW);

#undef ceph_sys2wire

	if (flags)
46
		dout("unused open flags: %x\n", flags);
47
48
49
50

	return cpu_to_le32(wire_flags);
}

Sage Weil's avatar
Sage Weil committed
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/*
 * Ceph file operations
 *
 * Implement basic open/close functionality, and implement
 * read/write.
 *
 * We implement three modes of file I/O:
 *  - buffered uses the generic_file_aio_{read,write} helpers
 *
 *  - synchronous is used when there is multi-client read/write
 *    sharing, avoids the page cache, and synchronously waits for an
 *    ack from the OSD.
 *
 *  - direct io takes the variant of the sync path that references
 *    user pages directly.
 *
 * fsync() flushes and waits on dirty pages, but just queues metadata
 * for writeback: since the MDS can recover size and mtime there is no
 * need to wait for MDS acknowledgement.
 */

72
/*
73
74
 * How many pages to get in one call to iov_iter_get_pages().  This
 * determines the size of the on-stack array used as a buffer.
75
 */
76
77
78
79
#define ITER_GET_BVECS_PAGES	64

static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
				struct bio_vec *bvecs)
80
{
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
	size_t size = 0;
	int bvec_idx = 0;

	if (maxsize > iov_iter_count(iter))
		maxsize = iov_iter_count(iter);

	while (size < maxsize) {
		struct page *pages[ITER_GET_BVECS_PAGES];
		ssize_t bytes;
		size_t start;
		int idx = 0;

		bytes = iov_iter_get_pages(iter, pages, maxsize - size,
					   ITER_GET_BVECS_PAGES, &start);
		if (bytes < 0)
			return size ?: bytes;

		iov_iter_advance(iter, bytes);
		size += bytes;

		for ( ; bytes; idx++, bvec_idx++) {
			struct bio_vec bv = {
				.bv_page = pages[idx],
				.bv_len = min_t(int, bytes, PAGE_SIZE - start),
				.bv_offset = start,
			};

			bvecs[bvec_idx] = bv;
			bytes -= bv.bv_len;
			start = 0;
		}
	}

	return size;
115
116
117
}

/*
118
119
120
121
122
123
 * iov_iter_get_pages() only considers one iov_iter segment, no matter
 * what maxsize or maxpages are given.  For ITER_BVEC that is a single
 * page.
 *
 * Attempt to get up to @maxsize bytes worth of pages from @iter.
 * Return the number of bytes in the created bio_vec array, or an error.
124
 */
125
126
static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize,
				    struct bio_vec **bvecs, int *num_bvecs)
127
{
128
129
130
131
	struct bio_vec *bv;
	size_t orig_count = iov_iter_count(iter);
	ssize_t bytes;
	int npages;
132

133
134
135
	iov_iter_truncate(iter, maxsize);
	npages = iov_iter_npages(iter, INT_MAX);
	iov_iter_reexpand(iter, orig_count);
136

137
138
139
140
141
142
143
	/*
	 * __iter_get_bvecs() may populate only part of the array -- zero it
	 * out.
	 */
	bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO);
	if (!bv)
		return -ENOMEM;
144

145
146
147
148
149
150
151
	bytes = __iter_get_bvecs(iter, maxsize, bv);
	if (bytes < 0) {
		/*
		 * No pages were pinned -- just free the array.
		 */
		kvfree(bv);
		return bytes;
152
153
	}

154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
	*bvecs = bv;
	*num_bvecs = npages;
	return bytes;
}

static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty)
{
	int i;

	for (i = 0; i < num_bvecs; i++) {
		if (bvecs[i].bv_page) {
			if (should_dirty)
				set_page_dirty_lock(bvecs[i].bv_page);
			put_page(bvecs[i].bv_page);
		}
	}
	kvfree(bvecs);
171
}
Sage Weil's avatar
Sage Weil committed
172
173
174
175
176
177
178
179

/*
 * Prepare an open request.  Preallocate ceph_cap to avoid an
 * inopportune ENOMEM later.
 */
static struct ceph_mds_request *
prepare_open_request(struct super_block *sb, int flags, int create_mode)
{
180
181
	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
	struct ceph_mds_client *mdsc = fsc->mdsc;
Sage Weil's avatar
Sage Weil committed
182
183
184
185
186
187
188
189
190
191
192
	struct ceph_mds_request *req;
	int want_auth = USE_ANY_MDS;
	int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN;

	if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC))
		want_auth = USE_AUTH_MDS;

	req = ceph_mdsc_create_request(mdsc, op, want_auth);
	if (IS_ERR(req))
		goto out;
	req->r_fmode = ceph_flags_to_mode(flags);
193
	req->r_args.open.flags = ceph_flags_sys2wire(flags);
Sage Weil's avatar
Sage Weil committed
194
195
196
197
198
	req->r_args.open.mode = cpu_to_le32(create_mode);
out:
	return req;
}

Chengguang Xu's avatar
Chengguang Xu committed
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
static int ceph_init_file_info(struct inode *inode, struct file *file,
					int fmode, bool isdir)
{
	struct ceph_file_info *fi;

	dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
			inode->i_mode, isdir ? "dir" : "regular");
	BUG_ON(inode->i_fop->release != ceph_release);

	if (isdir) {
		struct ceph_dir_file_info *dfi =
			kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL);
		if (!dfi) {
			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
			return -ENOMEM;
		}

		file->private_data = dfi;
		fi = &dfi->file_info;
		dfi->next_offset = 2;
		dfi->readdir_cache_idx = -1;
	} else {
		fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
		if (!fi) {
			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
			return -ENOMEM;
		}

		file->private_data = fi;
	}

	fi->fmode = fmode;
	spin_lock_init(&fi->rw_contexts_lock);
	INIT_LIST_HEAD(&fi->rw_contexts);

	return 0;
}

Sage Weil's avatar
Sage Weil committed
237
238
239
240
241
242
243
244
245
246
/*
 * initialize private struct file data.
 * if we fail, clean up by dropping fmode reference on the ceph_inode
 */
static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
{
	int ret = 0;

	switch (inode->i_mode & S_IFMT) {
	case S_IFREG:
247
248
		ceph_fscache_register_inode_cookie(inode);
		ceph_fscache_file_set_cookie(inode, file);
Sage Weil's avatar
Sage Weil committed
249
	case S_IFDIR:
Chengguang Xu's avatar
Chengguang Xu committed
250
251
252
253
		ret = ceph_init_file_info(inode, file, fmode,
						S_ISDIR(inode->i_mode));
		if (ret)
			return ret;
Sage Weil's avatar
Sage Weil committed
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
		break;

	case S_IFLNK:
		dout("init_file %p %p 0%o (symlink)\n", inode, file,
		     inode->i_mode);
		ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
		break;

	default:
		dout("init_file %p %p 0%o (special)\n", inode, file,
		     inode->i_mode);
		/*
		 * we need to drop the open ref now, since we don't
		 * have .release set to ceph_release.
		 */
		ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
		BUG_ON(inode->i_fop->release == ceph_release);

		/* call the proper open fop */
		ret = inode->i_fop->open(inode, file);
	}
	return ret;
}

278
279
280
281
282
283
284
285
286
287
288
289
290
/*
 * try renew caps after session gets killed.
 */
int ceph_renew_caps(struct inode *inode)
{
	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_mds_request *req;
	int err, flags, wanted;

	spin_lock(&ci->i_ceph_lock);
	wanted = __ceph_caps_file_wanted(ci);
	if (__ceph_is_any_real_caps(ci) &&
291
	    (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) {
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
		int issued = __ceph_caps_issued(ci, NULL);
		spin_unlock(&ci->i_ceph_lock);
		dout("renew caps %p want %s issued %s updating mds_wanted\n",
		     inode, ceph_cap_string(wanted), ceph_cap_string(issued));
		ceph_check_caps(ci, 0, NULL);
		return 0;
	}
	spin_unlock(&ci->i_ceph_lock);

	flags = 0;
	if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR))
		flags = O_RDWR;
	else if (wanted & CEPH_CAP_FILE_RD)
		flags = O_RDONLY;
	else if (wanted & CEPH_CAP_FILE_WR)
		flags = O_WRONLY;
#ifdef O_LAZY
	if (wanted & CEPH_CAP_FILE_LAZYIO)
		flags |= O_LAZY;
#endif

	req = prepare_open_request(inode->i_sb, flags, 0);
	if (IS_ERR(req)) {
		err = PTR_ERR(req);
		goto out;
	}

	req->r_inode = inode;
	ihold(inode);
	req->r_num_caps = 1;
	req->r_fmode = -1;

	err = ceph_mdsc_do_request(mdsc, NULL, req);
	ceph_mdsc_put_request(req);
out:
	dout("renew caps %p open result=%d\n", inode, err);
	return err < 0 ? err : 0;
}

Sage Weil's avatar
Sage Weil committed
331
332
333
334
335
336
337
338
339
/*
 * If we already have the requisite capabilities, we can satisfy
 * the open request locally (no need to request new caps from the
 * MDS).  We do, however, need to inform the MDS (asynchronously)
 * if our wanted caps set expands.
 */
int ceph_open(struct inode *inode, struct file *file)
{
	struct ceph_inode_info *ci = ceph_inode(inode);
340
341
	struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
	struct ceph_mds_client *mdsc = fsc->mdsc;
Sage Weil's avatar
Sage Weil committed
342
	struct ceph_mds_request *req;
343
	struct ceph_file_info *fi = file->private_data;
Sage Weil's avatar
Sage Weil committed
344
345
346
	int err;
	int flags, fmode, wanted;

347
	if (fi) {
Sage Weil's avatar
Sage Weil committed
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
		dout("open file %p is already opened\n", file);
		return 0;
	}

	/* filter out O_CREAT|O_EXCL; vfs did that already.  yuck. */
	flags = file->f_flags & ~(O_CREAT|O_EXCL);
	if (S_ISDIR(inode->i_mode))
		flags = O_DIRECTORY;  /* mds likes to know */

	dout("open inode %p ino %llx.%llx file %p flags %d (%d)\n", inode,
	     ceph_vinop(inode), file, flags, file->f_flags);
	fmode = ceph_flags_to_mode(flags);
	wanted = ceph_caps_for_mode(fmode);

	/* snapped files are read-only */
	if (ceph_snap(inode) != CEPH_NOSNAP && (file->f_mode & FMODE_WRITE))
		return -EROFS;

	/* trivially open snapdir */
	if (ceph_snap(inode) == CEPH_SNAPDIR) {
368
		spin_lock(&ci->i_ceph_lock);
Sage Weil's avatar
Sage Weil committed
369
		__ceph_get_fmode(ci, fmode);
370
		spin_unlock(&ci->i_ceph_lock);
Sage Weil's avatar
Sage Weil committed
371
372
373
374
		return ceph_init_file(inode, file, fmode);
	}

	/*
375
376
	 * No need to block if we have caps on the auth MDS (for
	 * write) or any MDS (for read).  Update wanted set
Sage Weil's avatar
Sage Weil committed
377
378
	 * asynchronously.
	 */
379
	spin_lock(&ci->i_ceph_lock);
380
381
	if (__ceph_is_any_real_caps(ci) &&
	    (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) {
382
		int mds_wanted = __ceph_caps_mds_wanted(ci, true);
Sage Weil's avatar
Sage Weil committed
383
384
385
386
387
388
		int issued = __ceph_caps_issued(ci, NULL);

		dout("open %p fmode %d want %s issued %s using existing\n",
		     inode, fmode, ceph_cap_string(wanted),
		     ceph_cap_string(issued));
		__ceph_get_fmode(ci, fmode);
389
		spin_unlock(&ci->i_ceph_lock);
Sage Weil's avatar
Sage Weil committed
390
391
392
393
394
395
396
397
398
399
400

		/* adjust wanted? */
		if ((issued & wanted) != wanted &&
		    (mds_wanted & wanted) != wanted &&
		    ceph_snap(inode) != CEPH_SNAPDIR)
			ceph_check_caps(ci, 0, NULL);

		return ceph_init_file(inode, file, fmode);
	} else if (ceph_snap(inode) != CEPH_NOSNAP &&
		   (ci->i_snap_caps & wanted) == wanted) {
		__ceph_get_fmode(ci, fmode);
401
		spin_unlock(&ci->i_ceph_lock);
Sage Weil's avatar
Sage Weil committed
402
403
		return ceph_init_file(inode, file, fmode);
	}
404

405
	spin_unlock(&ci->i_ceph_lock);
Sage Weil's avatar
Sage Weil committed
406
407
408
409
410
411
412

	dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
	req = prepare_open_request(inode->i_sb, flags, 0);
	if (IS_ERR(req)) {
		err = PTR_ERR(req);
		goto out;
	}
413
414
	req->r_inode = inode;
	ihold(inode);
415

Sage Weil's avatar
Sage Weil committed
416
	req->r_num_caps = 1;
417
	err = ceph_mdsc_do_request(mdsc, NULL, req);
Sage Weil's avatar
Sage Weil committed
418
419
420
421
422
423
424
425
426
427
	if (!err)
		err = ceph_init_file(inode, file, req->r_fmode);
	ceph_mdsc_put_request(req);
	dout("open result=%d on %llx.%llx\n", err, ceph_vinop(inode));
out:
	return err;
}


/*
Sage Weil's avatar
Sage Weil committed
428
429
 * Do a lookup + open with a single request.  If we get a non-existent
 * file or symlink, return 1 so the VFS can retry.
Sage Weil's avatar
Sage Weil committed
430
 */
Sage Weil's avatar
Sage Weil committed
431
int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
Al Viro's avatar
Al Viro committed
432
		     struct file *file, unsigned flags, umode_t mode,
Al Viro's avatar
Al Viro committed
433
		     int *opened)
Sage Weil's avatar
Sage Weil committed
434
{
435
436
	struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
	struct ceph_mds_client *mdsc = fsc->mdsc;
Sage Weil's avatar
Sage Weil committed
437
	struct ceph_mds_request *req;
Sage Weil's avatar
Sage Weil committed
438
	struct dentry *dn;
439
	struct ceph_acls_info acls = {};
440
	int mask;
Sage Weil's avatar
Sage Weil committed
441
442
	int err;

Al Viro's avatar
Al Viro committed
443
444
	dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
	     dir, dentry, dentry,
Sage Weil's avatar
Sage Weil committed
445
446
447
448
449
	     d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);

	if (dentry->d_name.len > NAME_MAX)
		return -ENAMETOOLONG;

450
	if (flags & O_CREAT) {
451
452
		if (ceph_quota_is_max_files_exceeded(dir))
			return -EDQUOT;
453
454
455
456
457
		err = ceph_pre_init_acls(dir, &mode, &acls);
		if (err < 0)
			return err;
	}

Sage Weil's avatar
Sage Weil committed
458
459
	/* do the open */
	req = prepare_open_request(dir->i_sb, flags, mode);
460
461
462
463
	if (IS_ERR(req)) {
		err = PTR_ERR(req);
		goto out_acl;
	}
Sage Weil's avatar
Sage Weil committed
464
465
466
	req->r_dentry = dget(dentry);
	req->r_num_caps = 2;
	if (flags & O_CREAT) {
467
		req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
Sage Weil's avatar
Sage Weil committed
468
		req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
469
470
471
472
		if (acls.pagelist) {
			req->r_pagelist = acls.pagelist;
			acls.pagelist = NULL;
		}
Sage Weil's avatar
Sage Weil committed
473
	}
Yan, Zheng's avatar
Yan, Zheng committed
474
475
476
477
478
479

       mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
       if (ceph_security_xattr_wanted(dir))
               mask |= CEPH_CAP_XATTR_SHARED;
       req->r_args.open.mask = cpu_to_le32(mask);

480
481
	req->r_parent = dir;
	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
482
483
484
	err = ceph_mdsc_do_request(mdsc,
				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
				   req);
Yan, Zheng's avatar
Yan, Zheng committed
485
	err = ceph_handle_snapdir(req, dentry, err);
486
	if (err)
487
		goto out_req;
488

489
	if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
Sage Weil's avatar
Sage Weil committed
490
		err = ceph_handle_notrace_create(dir, dentry);
491

492
	if (d_in_lookup(dentry)) {
Sage Weil's avatar
Sage Weil committed
493
494
495
496
497
498
499
500
		dn = ceph_finish_lookup(req, dentry, err);
		if (IS_ERR(dn))
			err = PTR_ERR(dn);
	} else {
		/* we were given a hashed negative dentry */
		dn = NULL;
	}
	if (err)
501
		goto out_req;
502
	if (dn || d_really_is_negative(dentry) || d_is_symlink(dentry)) {
Sage Weil's avatar
Sage Weil committed
503
504
505
506
507
		/* make vfs retry on splice, ENOENT, or symlink */
		dout("atomic_open finish_no_open on dn %p\n", dn);
		err = finish_no_open(file, dn);
	} else {
		dout("atomic_open finish_open on dn %p\n", dn);
508
		if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
509
			ceph_init_inode_acls(d_inode(dentry), &acls);
510
511
			*opened |= FILE_CREATED;
		}
Sage Weil's avatar
Sage Weil committed
512
513
		err = finish_open(file, dentry, ceph_open, opened);
	}
514
out_req:
515
516
	if (!req->r_err && req->r_target_inode)
		ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
Sage Weil's avatar
Sage Weil committed
517
	ceph_mdsc_put_request(req);
518
519
out_acl:
	ceph_release_acls_info(&acls);
Sage Weil's avatar
Sage Weil committed
520
	dout("atomic_open result=%d\n", err);
Al Viro's avatar
Al Viro committed
521
	return err;
Sage Weil's avatar
Sage Weil committed
522
523
524
525
526
527
}

int ceph_release(struct inode *inode, struct file *file)
{
	struct ceph_inode_info *ci = ceph_inode(inode);

Chengguang Xu's avatar
Chengguang Xu committed
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
	if (S_ISDIR(inode->i_mode)) {
		struct ceph_dir_file_info *dfi = file->private_data;
		dout("release inode %p dir file %p\n", inode, file);
		WARN_ON(!list_empty(&dfi->file_info.rw_contexts));

		ceph_put_fmode(ci, dfi->file_info.fmode);

		if (dfi->last_readdir)
			ceph_mdsc_put_request(dfi->last_readdir);
		kfree(dfi->last_name);
		kfree(dfi->dir_info);
		kmem_cache_free(ceph_dir_file_cachep, dfi);
	} else {
		struct ceph_file_info *fi = file->private_data;
		dout("release inode %p regular file %p\n", inode, file);
		WARN_ON(!list_empty(&fi->rw_contexts));

		ceph_put_fmode(ci, fi->fmode);
		kmem_cache_free(ceph_file_cachep, fi);
	}
548
549

	/* wake up anyone waiting for caps on this inode */
550
	wake_up_all(&ci->i_cap_wq);
Sage Weil's avatar
Sage Weil committed
551
552
553
	return 0;
}

Yan, Zheng's avatar
Yan, Zheng committed
554
enum {
Yan, Zheng's avatar
Yan, Zheng committed
555
556
557
	HAVE_RETRIED = 1,
	CHECK_EOF =    2,
	READ_INLINE =  3,
Yan, Zheng's avatar
Yan, Zheng committed
558
559
};

Sage Weil's avatar
Sage Weil committed
560
561
562
563
564
565
566
567
/*
 * Read a range of bytes striped over one or more objects.  Iterate over
 * objects we stripe over.  (That's not atomic, but good enough for now.)
 *
 * If we get a short result from the OSD, check against i_size; we need to
 * only return a short read to the caller if we hit EOF.
 */
static int striped_read(struct inode *inode,
568
			u64 pos, u64 len,
569
			struct page **pages, int num_pages,
570
			int page_align, int *checkeof)
Sage Weil's avatar
Sage Weil committed
571
{
572
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
Sage Weil's avatar
Sage Weil committed
573
	struct ceph_inode_info *ci = ceph_inode(inode);
574
	u64 this_len;
575
	loff_t i_size;
576
577
	int page_idx;
	int ret, read = 0;
Sage Weil's avatar
Sage Weil committed
578
579
580
581
582
583
	bool hit_stripe, was_short;

	/*
	 * we may need to do multiple reads.  not atomic, unfortunately.
	 */
more:
584
585
	this_len = len;
	page_idx = (page_align + read) >> PAGE_SHIFT;
586
	ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
Sage Weil's avatar
Sage Weil committed
587
				  &ci->i_layout, pos, &this_len,
588
589
590
				  ci->i_truncate_seq, ci->i_truncate_size,
				  pages + page_idx, num_pages - page_idx,
				  ((page_align + read) & ~PAGE_MASK));
Sage Weil's avatar
Sage Weil committed
591
592
	if (ret == -ENOENT)
		ret = 0;
593
	hit_stripe = this_len < len;
594
	was_short = ret >= 0 && ret < this_len;
595
	dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
Sage Weil's avatar
Sage Weil committed
596
597
	     ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");

598
	i_size = i_size_read(inode);
599
	if (ret >= 0) {
600
601
		if (was_short && (pos + ret < i_size)) {
			int zlen = min(this_len - ret, i_size - pos - ret);
602
			int zoff = page_align + read + ret;
603
			dout(" zero gap %llu to %llu\n",
604
			     pos + ret, pos + ret + zlen);
605
606
			ceph_zero_page_vector_range(zoff, zlen, pages);
			ret += zlen;
Sage Weil's avatar
Sage Weil committed
607
		}
608

609
		read += ret;
Sage Weil's avatar
Sage Weil committed
610
		pos += ret;
611
		len -= ret;
Sage Weil's avatar
Sage Weil committed
612

613
		/* hit stripe and need continue*/
614
		if (len && hit_stripe && pos < i_size)
Sage Weil's avatar
Sage Weil committed
615
616
617
			goto more;
	}

618
	if (read > 0) {
619
		ret = read;
620
		/* did we bounce off eof? */
621
		if (pos + len > i_size)
Yan, Zheng's avatar
Yan, Zheng committed
622
			*checkeof = CHECK_EOF;
Sage Weil's avatar
Sage Weil committed
623
624
625
626
627
628
629
630
631
632
633
634
	}

	dout("striped_read returns %d\n", ret);
	return ret;
}

/*
 * Completely synchronous read and write methods.  Direct from __user
 * buffer to osd, or directly to user pages (if O_DIRECT).
 *
 * If the read spans object boundary, just do multiple reads.
 */
635
636
static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
			      int *checkeof)
Sage Weil's avatar
Sage Weil committed
637
{
638
	struct file *file = iocb->ki_filp;
Al Viro's avatar
Al Viro committed
639
	struct inode *inode = file_inode(file);
Sage Weil's avatar
Sage Weil committed
640
	struct page **pages;
641
	u64 off = iocb->ki_pos;
642
643
644
	int num_pages;
	ssize_t ret;
	size_t len = iov_iter_count(to);
Sage Weil's avatar
Sage Weil committed
645

646
	dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
Sage Weil's avatar
Sage Weil committed
647
	     (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
648
649
650

	if (!len)
		return 0;
651
652
653
654
655
656
	/*
	 * flush any page cache pages in this range.  this
	 * will make concurrent normal and sync io slow,
	 * but it will at least behave sensibly when they are
	 * in sequence.
	 */
657
658
	ret = filemap_write_and_wait_range(inode->i_mapping, off,
						off + len);
659
	if (ret < 0)
660
		return ret;
661

662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
	if (unlikely(to->type & ITER_PIPE)) {
		size_t page_off;
		ret = iov_iter_get_pages_alloc(to, &pages, len,
					       &page_off);
		if (ret <= 0)
			return -ENOMEM;
		num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);

		ret = striped_read(inode, off, ret, pages, num_pages,
				   page_off, checkeof);
		if (ret > 0) {
			iov_iter_advance(to, ret);
			off += ret;
		} else {
			iov_iter_advance(to, 0);
		}
		ceph_put_page_vector(pages, num_pages, false);
	} else {
		num_pages = calc_pages_for(off, len);
		pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
		if (IS_ERR(pages))
			return PTR_ERR(pages);

		ret = striped_read(inode, off, len, pages, num_pages,
				   (off & ~PAGE_MASK), checkeof);
		if (ret > 0) {
			int l, k = 0;
			size_t left = ret;

			while (left) {
				size_t page_off = off & ~PAGE_MASK;
				size_t copy = min_t(size_t, left,
						    PAGE_SIZE - page_off);
				l = copy_page_to_iter(pages[k++], page_off,
						      copy, to);
				off += l;
				left -= l;
				if (l < copy)
					break;
			}
702
		}
703
		ceph_release_page_vector(pages, num_pages);
704
	}
Sage Weil's avatar
Sage Weil committed
705

706
707
708
709
	if (off > iocb->ki_pos) {
		ret = off - iocb->ki_pos;
		iocb->ki_pos = off;
	}
Sage Weil's avatar
Sage Weil committed
710

711
	dout("sync_read result %zd\n", ret);
Sage Weil's avatar
Sage Weil committed
712
713
714
	return ret;
}

Yan, Zheng's avatar
Yan, Zheng committed
715
716
717
struct ceph_aio_request {
	struct kiocb *iocb;
	size_t total_len;
718
719
	bool write;
	bool should_dirty;
Yan, Zheng's avatar
Yan, Zheng committed
720
721
722
723
	int error;
	struct list_head osd_reqs;
	unsigned num_reqs;
	atomic_t pending_reqs;
724
	struct timespec mtime;
Yan, Zheng's avatar
Yan, Zheng committed
725
726
727
	struct ceph_cap_flush *prealloc_cf;
};

728
729
730
731
732
733
734
struct ceph_aio_work {
	struct work_struct work;
	struct ceph_osd_request *req;
};

static void ceph_aio_retry_work(struct work_struct *work);

Yan, Zheng's avatar
Yan, Zheng committed
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
static void ceph_aio_complete(struct inode *inode,
			      struct ceph_aio_request *aio_req)
{
	struct ceph_inode_info *ci = ceph_inode(inode);
	int ret;

	if (!atomic_dec_and_test(&aio_req->pending_reqs))
		return;

	ret = aio_req->error;
	if (!ret)
		ret = aio_req->total_len;

	dout("ceph_aio_complete %p rc %d\n", inode, ret);

	if (ret >= 0 && aio_req->write) {
		int dirty;

		loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
		if (endoff > i_size_read(inode)) {
			if (ceph_inode_set_size(inode, endoff))
				ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
		}

		spin_lock(&ci->i_ceph_lock);
		ci->i_inline_version = CEPH_INLINE_NONE;
		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
					       &aio_req->prealloc_cf);
		spin_unlock(&ci->i_ceph_lock);
		if (dirty)
			__mark_inode_dirty(inode, dirty);

	}

	ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
						CEPH_CAP_FILE_RD));

	aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);

	ceph_free_cap_flush(aio_req->prealloc_cf);
	kfree(aio_req);
}

778
static void ceph_aio_complete_req(struct ceph_osd_request *req)
Yan, Zheng's avatar
Yan, Zheng committed
779
780
781
782
783
784
{
	int rc = req->r_result;
	struct inode *inode = req->r_inode;
	struct ceph_aio_request *aio_req = req->r_priv;
	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);

785
786
787
788
789
	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
	BUG_ON(!osd_data->num_bvecs);

	dout("ceph_aio_complete_req %p rc %d bytes %u\n",
	     inode, rc, osd_data->bvec_pos.iter.bi_size);
Yan, Zheng's avatar
Yan, Zheng committed
790
791

	if (rc == -EOLDSNAPC) {
792
793
794
795
796
797
798
799
800
801
802
803
804
		struct ceph_aio_work *aio_work;
		BUG_ON(!aio_req->write);

		aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
		if (aio_work) {
			INIT_WORK(&aio_work->work, ceph_aio_retry_work);
			aio_work->req = req;
			queue_work(ceph_inode_to_client(inode)->wb_wq,
				   &aio_work->work);
			return;
		}
		rc = -ENOMEM;
	} else if (!aio_req->write) {
Yan, Zheng's avatar
Yan, Zheng committed
805
806
		if (rc == -ENOENT)
			rc = 0;
807
808
809
810
		if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) {
			struct iov_iter i;
			int zlen = osd_data->bvec_pos.iter.bi_size - rc;

Yan, Zheng's avatar
Yan, Zheng committed
811
812
813
814
815
816
817
818
819
820
821
822
823
824
			/*
			 * If read is satisfied by single OSD request,
			 * it can pass EOF. Otherwise read is within
			 * i_size.
			 */
			if (aio_req->num_reqs == 1) {
				loff_t i_size = i_size_read(inode);
				loff_t endoff = aio_req->iocb->ki_pos + rc;
				if (endoff < i_size)
					zlen = min_t(size_t, zlen,
						     i_size - endoff);
				aio_req->total_len = rc + zlen;
			}

825
826
827
828
829
			iov_iter_bvec(&i, ITER_BVEC, osd_data->bvec_pos.bvecs,
				      osd_data->num_bvecs,
				      osd_data->bvec_pos.iter.bi_size);
			iov_iter_advance(&i, rc);
			iov_iter_zero(zlen, &i);
Yan, Zheng's avatar
Yan, Zheng committed
830
831
832
		}
	}

833
834
	put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
		  aio_req->should_dirty);
Yan, Zheng's avatar
Yan, Zheng committed
835
836
837
838
839
840
841
842
843
	ceph_osdc_put_request(req);

	if (rc < 0)
		cmpxchg(&aio_req->error, 0, rc);

	ceph_aio_complete(inode, aio_req);
	return;
}

844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
static void ceph_aio_retry_work(struct work_struct *work)
{
	struct ceph_aio_work *aio_work =
		container_of(work, struct ceph_aio_work, work);
	struct ceph_osd_request *orig_req = aio_work->req;
	struct ceph_aio_request *aio_req = orig_req->r_priv;
	struct inode *inode = orig_req->r_inode;
	struct ceph_inode_info *ci = ceph_inode(inode);
	struct ceph_snap_context *snapc;
	struct ceph_osd_request *req;
	int ret;

	spin_lock(&ci->i_ceph_lock);
	if (__ceph_have_pending_cap_snap(ci)) {
		struct ceph_cap_snap *capsnap =
			list_last_entry(&ci->i_cap_snaps,
					struct ceph_cap_snap,
					ci_item);
		snapc = ceph_get_snap_context(capsnap->context);
	} else {
		BUG_ON(!ci->i_head_snapc);
		snapc = ceph_get_snap_context(ci->i_head_snapc);
	}
	spin_unlock(&ci->i_ceph_lock);

	req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
			false, GFP_NOFS);
871
872
	if (!req) {
		ret = -ENOMEM;
873
874
875
876
		req = orig_req;
		goto out;
	}

877
	req->r_flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
878
	ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
879
	ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
880

881
882
883
884
885
886
	ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
	if (ret) {
		ceph_osdc_put_request(req);
		req = orig_req;
		goto out;
	}
887
888
889

	req->r_ops[0] = orig_req->r_ops[0];

890
891
	req->r_mtime = aio_req->mtime;
	req->r_data_offset = req->r_ops[0].extent.offset;
892
893
894
895
896
897
898
899
900
901
902

	ceph_osdc_put_request(orig_req);

	req->r_callback = ceph_aio_complete_req;
	req->r_inode = inode;
	req->r_priv = aio_req;

	ret = ceph_osdc_start_request(req->r_osdc, req, false);
out:
	if (ret < 0) {
		req->r_result = ret;
903
		ceph_aio_complete_req(req);
904
905
	}

906
	ceph_put_snap_context(snapc);
907
908
909
	kfree(aio_work);
}

910
static ssize_t
Yan, Zheng's avatar
Yan, Zheng committed
911
912
913
ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
		       struct ceph_snap_context *snapc,
		       struct ceph_cap_flush **pcf)
Sage Weil's avatar
Sage Weil committed
914
{
915
	struct file *file = iocb->ki_filp;
Al Viro's avatar
Al Viro committed
916
	struct inode *inode = file_inode(file);
Sage Weil's avatar
Sage Weil committed
917
	struct ceph_inode_info *ci = ceph_inode(inode);
918
	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
919
	struct ceph_vino vino;
Sage Weil's avatar
Sage Weil committed
920
	struct ceph_osd_request *req;
921
	struct bio_vec *bvecs;
Yan, Zheng's avatar
Yan, Zheng committed
922
923
	struct ceph_aio_request *aio_req = NULL;
	int num_pages = 0;
Sage Weil's avatar
Sage Weil committed
924
925
	int flags;
	int ret;
926
	struct timespec mtime = timespec64_to_timespec(current_time(inode));
Yan, Zheng's avatar
Yan, Zheng committed
927
928
929
	size_t count = iov_iter_count(iter);
	loff_t pos = iocb->ki_pos;
	bool write = iov_iter_rw(iter) == WRITE;
930
	bool should_dirty = !write && iter_is_iovec(iter);
Sage Weil's avatar
Sage Weil committed
931

Yan, Zheng's avatar
Yan, Zheng committed
932
	if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
Sage Weil's avatar
Sage Weil committed
933
934
		return -EROFS;

935
936
937
	dout("sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n",
	     (write ? "write" : "read"), file, pos, (unsigned)count,
	     snapc, snapc->seq);
Sage Weil's avatar
Sage Weil committed
938

939
	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
940
941
942
	if (ret < 0)
		return ret;

Yan, Zheng's avatar
Yan, Zheng committed
943
	if (write) {
944
		int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
945
946
					pos >> PAGE_SHIFT,
					(pos + count) >> PAGE_SHIFT);
947
		if (ret2 < 0)
948
			dout("invalidate_inode_pages2_range returned %d\n", ret2);
949

950
		flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
Yan, Zheng's avatar
Yan, Zheng committed
951
952
953
	} else {
		flags = CEPH_OSD_FLAG_READ;
	}
Sage Weil's avatar
Sage Weil committed
954

Yan, Zheng's avatar
Yan, Zheng committed
955
	while (iov_iter_count(iter) > 0) {
956
		u64 size = iov_iter_count(iter);
Yan, Zheng's avatar
Yan, Zheng committed
957
		ssize_t len;
958

959
960
961
962
963
		if (write)
			size = min_t(u64, size, fsc->mount_options->wsize);
		else
			size = min_t(u64, size, fsc->mount_options->rsize);

964
965
		vino = ceph_vino(inode);
		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
Yan, Zheng's avatar
Yan, Zheng committed
966
					    vino, pos, &size, 0,
Yanhu Cao's avatar
Yanhu Cao committed
967
					    1,
Yan, Zheng's avatar
Yan, Zheng committed
968
969
970
					    write ? CEPH_OSD_OP_WRITE :
						    CEPH_OSD_OP_READ,
					    flags, snapc,
971
972
973
974
975
					    ci->i_truncate_seq,
					    ci->i_truncate_size,
					    false);
		if (IS_ERR(req)) {
			ret = PTR_ERR(req);
976
			break;
977
		}
Sage Weil's avatar
Sage Weil committed
978

979
980
		len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages);
		if (len < 0) {
981
			ceph_osdc_put_request(req);
982
			ret = len;
983
			break;
Sage Weil's avatar
Sage Weil committed
984
		}
985
986
		if (len != size)
			osd_req_op_extent_update(req, 0, len);
Sage Weil's avatar
Sage Weil committed
987
988

		/*
Yan, Zheng's avatar
Yan, Zheng committed
989
990
		 * To simplify error handling, allow AIO when IO within i_size
		 * or IO can be satisfied by single OSD request.
Sage Weil's avatar
Sage Weil committed
991
		 */
Yan, Zheng's avatar
Yan, Zheng committed
992
993
994
995
996
997
		if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
		    (len == count || pos + count <= i_size_read(inode))) {
			aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
			if (aio_req) {
				aio_req->iocb = iocb;
				aio_req->write = write;
998
				aio_req->should_dirty = should_dirty;
Yan, Zheng's avatar
Yan, Zheng committed
999
1000
				INIT_LIST_HEAD(&aio_req->osd_reqs);
				if (write) {