Commit a76bab4952a1539266490295fb50b78802c467c2

Authored by aliguori
1 parent 1121f879

Refactor AIO to allow multiple AIO implementations

This patch refactors the AIO layer to allow multiple AIO implementations.  It's
only possible because of the recent signalfd() patch.  

Right now, the AIO infrastructure is pretty specific to the block raw backend.
For other block devices to implement AIO, the qemu_aio_wait function must
support registration.  This patch introduces a new function,
qemu_aio_set_fd_handler, which can be used to register a file descriptor to be
called back.  qemu_aio_wait() now polls a set of file descriptors registered
with this function until one becomes readable or writable.

This patch should allow the implementation of alternative AIO backends (via a
thread pool or linux-aio) and AIO backends in non-traditional block devices
(like NBD).

Signed-off-by: Anthony Liguori <aliguori@us.ibm.com>



git-svn-id: svn://svn.savannah.nongnu.org/qemu/trunk@5297 c046a42c-6fe2-441c-8c8c-71466251a162
Makefile
... ... @@ -51,7 +51,7 @@ BLOCK_OBJS=cutils.o qemu-malloc.o
51 51 BLOCK_OBJS+=block-cow.o block-qcow.o aes.o block-vmdk.o block-cloop.o
52 52 BLOCK_OBJS+=block-dmg.o block-bochs.o block-vpc.o block-vvfat.o
53 53 BLOCK_OBJS+=block-qcow2.o block-parallels.o block-nbd.o
54   -BLOCK_OBJS+=nbd.o block.o
  54 +BLOCK_OBJS+=nbd.o block.o aio.o
55 55  
56 56 ifdef CONFIG_WIN32
57 57 BLOCK_OBJS += block-raw-win32.o
... ...
Makefile.target
... ... @@ -474,7 +474,7 @@ endif #CONFIG_DARWIN_USER
474 474 ifndef CONFIG_USER_ONLY
475 475  
476 476 OBJS=vl.o osdep.o monitor.o pci.o loader.o isa_mmio.o machine.o net-checksum.o
477   -OBJS+=fw_cfg.o
  477 +OBJS+=fw_cfg.o aio.o
478 478 ifdef CONFIG_WIN32
479 479 OBJS+=block-raw-win32.o
480 480 else
... ...
aio.c 0 → 100644
  1 +/*
  2 + * QEMU aio implementation
  3 + *
  4 + * Copyright IBM, Corp. 2008
  5 + *
  6 + * Authors:
  7 + * Anthony Liguori <aliguori@us.ibm.com>
  8 + *
  9 + * This work is licensed under the terms of the GNU GPL, version 2. See
  10 + * the COPYING file in the top-level directory.
  11 + *
  12 + */
  13 +
  14 +#include "qemu-common.h"
  15 +#include "block.h"
  16 +#include "sys-queue.h"
  17 +#include "qemu_socket.h"
  18 +
  19 +typedef struct AioHandler AioHandler;
  20 +
  21 +/* The list of registered AIO handlers */
  22 +static LIST_HEAD(, AioHandler) aio_handlers;
  23 +
  24 +/* This is a simple lock used to protect the aio_handlers list. Specifically,
  25 + * it's used to ensure that no callbacks are removed while we're walking and
  26 + * dispatching callbacks.
  27 + */
  28 +static int walking_handlers;
  29 +
  30 +struct AioHandler
  31 +{
  32 + int fd;
  33 + IOHandler *io_read;
  34 + IOHandler *io_write;
  35 + AioFlushHandler *io_flush;
  36 + int deleted;
  37 + void *opaque;
  38 + LIST_ENTRY(AioHandler) node;
  39 +};
  40 +
  41 +static AioHandler *find_aio_handler(int fd)
  42 +{
  43 + AioHandler *node;
  44 +
  45 + LIST_FOREACH(node, &aio_handlers, node) {
  46 + if (node->fd == fd)
  47 + return node;
  48 + }
  49 +
  50 + return NULL;
  51 +}
  52 +
  53 +int qemu_aio_set_fd_handler(int fd,
  54 + IOHandler *io_read,
  55 + IOHandler *io_write,
  56 + AioFlushHandler *io_flush,
  57 + void *opaque)
  58 +{
  59 + AioHandler *node;
  60 +
  61 + node = find_aio_handler(fd);
  62 +
  63 + /* Are we deleting the fd handler? */
  64 + if (!io_read && !io_write) {
  65 + if (node) {
  66 + /* If the lock is held, just mark the node as deleted */
  67 + if (walking_handlers)
  68 + node->deleted = 1;
  69 + else {
  70 + /* Otherwise, delete it for real. We can't just mark it as
  71 + * deleted because deleted nodes are only cleaned up after
  72 + * releasing the walking_handlers lock.
  73 + */
  74 + LIST_REMOVE(node, node);
  75 + qemu_free(node);
  76 + }
  77 + }
  78 + } else {
  79 + if (node == NULL) {
  80 + /* Alloc and insert if it's not already there */
  81 + node = qemu_mallocz(sizeof(AioHandler));
  82 + if (node == NULL)
  83 + return -ENOMEM;
  84 + node->fd = fd;
  85 + LIST_INSERT_HEAD(&aio_handlers, node, node);
  86 + }
  87 + /* Update handler with latest information */
  88 + node->io_read = io_read;
  89 + node->io_write = io_write;
  90 + node->io_flush = io_flush;
  91 + node->opaque = opaque;
  92 + }
  93 +
  94 + qemu_set_fd_handler2(fd, NULL, io_read, io_write, opaque);
  95 +
  96 + return 0;
  97 +}
  98 +
  99 +void qemu_aio_flush(void)
  100 +{
  101 + AioHandler *node;
  102 + int ret;
  103 +
  104 + do {
  105 + ret = 0;
  106 +
  107 + LIST_FOREACH(node, &aio_handlers, node) {
  108 + ret |= node->io_flush(node->opaque);
  109 + }
  110 +
  111 + qemu_aio_wait();
  112 + } while (ret > 0);
  113 +}
  114 +
  115 +void qemu_aio_wait(void)
  116 +{
  117 + int ret;
  118 +
  119 + if (qemu_bh_poll())
  120 + return;
  121 +
  122 + do {
  123 + AioHandler *node;
  124 + fd_set rdfds, wrfds;
  125 + int max_fd = -1;
  126 +
  127 + walking_handlers = 1;
  128 +
  129 + /* fill fd sets */
  130 + LIST_FOREACH(node, &aio_handlers, node) {
  131 + /* If there aren't pending AIO operations, don't invoke callbacks.
  132 + * Otherwise, if there are no AIO requests, qemu_aio_wait() would
  133 + * wait indefinitely.
  134 + */
  135 + if (node->io_flush && node->io_flush(node->opaque) == 0)
  136 + continue;
  137 +
  138 + if (!node->deleted && node->io_read) {
  139 + FD_SET(node->fd, &rdfds);
  140 + max_fd = MAX(max_fd, node->fd + 1);
  141 + }
  142 + if (!node->deleted && node->io_write) {
  143 + FD_SET(node->fd, &wrfds);
  144 + max_fd = MAX(max_fd, node->fd + 1);
  145 + }
  146 + }
  147 +
  148 + walking_handlers = 0;
  149 +
  150 + /* No AIO operations? Get us out of here */
  151 + if (max_fd == -1)
  152 + break;
  153 +
  154 + /* wait until next event */
  155 + ret = select(max_fd, &rdfds, &wrfds, NULL, NULL);
  156 + if (ret == -1 && errno == EINTR)
  157 + continue;
  158 +
  159 + /* if we have any readable fds, dispatch event */
  160 + if (ret > 0) {
  161 + walking_handlers = 1;
  162 +
  163 + /* we have to walk very carefully in case
  164 + * qemu_aio_set_fd_handler is called while we're walking */
  165 + node = LIST_FIRST(&aio_handlers);
  166 + while (node) {
  167 + AioHandler *tmp;
  168 +
  169 + if (!node->deleted &&
  170 + FD_ISSET(node->fd, &rdfds) &&
  171 + node->io_read) {
  172 + node->io_read(node->opaque);
  173 + }
  174 + if (!node->deleted &&
  175 + FD_ISSET(node->fd, &wrfds) &&
  176 + node->io_write) {
  177 + node->io_write(node->opaque);
  178 + }
  179 +
  180 + tmp = node;
  181 + node = LIST_NEXT(node, node);
  182 +
  183 + if (tmp->deleted) {
  184 + LIST_REMOVE(tmp, node);
  185 + qemu_free(tmp);
  186 + }
  187 + }
  188 +
  189 + walking_handlers = 0;
  190 + }
  191 + } while (ret == 0);
  192 +}
... ...
block-raw-posix.c
... ... @@ -101,6 +101,8 @@ typedef struct BDRVRawState {
101 101 #endif
102 102 } BDRVRawState;
103 103  
  104 +static int posix_aio_init(void);
  105 +
104 106 static int fd_open(BlockDriverState *bs);
105 107  
106 108 static int raw_open(BlockDriverState *bs, const char *filename, int flags)
... ... @@ -108,6 +110,8 @@ static int raw_open(BlockDriverState *bs, const char *filename, int flags)
108 110 BDRVRawState *s = bs->opaque;
109 111 int fd, open_flags, ret;
110 112  
  113 + posix_aio_init();
  114 +
111 115 s->lseek_err_cnt = 0;
112 116  
113 117 open_flags = O_BINARY;
... ... @@ -437,13 +441,15 @@ typedef struct RawAIOCB {
437 441 int ret;
438 442 } RawAIOCB;
439 443  
440   -static int aio_sig_fd = -1;
441   -static int aio_sig_num = SIGUSR2;
442   -static RawAIOCB *first_aio; /* AIO issued */
443   -static int aio_initialized = 0;
  444 +typedef struct PosixAioState
  445 +{
  446 + int fd;
  447 + RawAIOCB *first_aio;
  448 +} PosixAioState;
444 449  
445   -static void qemu_aio_poll(void *opaque)
  450 +static void posix_aio_read(void *opaque)
446 451 {
  452 + PosixAioState *s = opaque;
447 453 RawAIOCB *acb, **pacb;
448 454 int ret;
449 455 size_t offset;
... ... @@ -457,7 +463,7 @@ static void qemu_aio_poll(void *opaque)
457 463 while (offset < 128) {
458 464 ssize_t len;
459 465  
460   - len = read(aio_sig_fd, sig.buf + offset, 128 - offset);
  466 + len = read(s->fd, sig.buf + offset, 128 - offset);
461 467 if (len == -1 && errno == EINTR)
462 468 continue;
463 469 if (len == -1 && errno == EAGAIN) {
... ... @@ -472,7 +478,7 @@ static void qemu_aio_poll(void *opaque)
472 478 }
473 479  
474 480 for(;;) {
475   - pacb = &first_aio;
  481 + pacb = &s->first_aio;
476 482 for(;;) {
477 483 acb = *pacb;
478 484 if (!acb)
... ... @@ -507,25 +513,37 @@ static void qemu_aio_poll(void *opaque)
507 513 the_end: ;
508 514 }
509 515  
510   -void qemu_aio_init(void)
  516 +static int posix_aio_flush(void *opaque)
511 517 {
512   - sigset_t mask;
  518 + PosixAioState *s = opaque;
  519 + return !!s->first_aio;
  520 +}
513 521  
514   - if (aio_initialized)
515   - return;
  522 +static PosixAioState *posix_aio_state;
516 523  
517   - aio_initialized = 1;
  524 +static int posix_aio_init(void)
  525 +{
  526 + sigset_t mask;
  527 + PosixAioState *s;
  528 +
  529 + if (posix_aio_state)
  530 + return 0;
  531 +
  532 + s = qemu_malloc(sizeof(PosixAioState));
  533 + if (s == NULL)
  534 + return -ENOMEM;
518 535  
519 536 /* Make sure to block AIO signal */
520 537 sigemptyset(&mask);
521   - sigaddset(&mask, aio_sig_num);
  538 + sigaddset(&mask, SIGUSR2);
522 539 sigprocmask(SIG_BLOCK, &mask, NULL);
523 540  
524   - aio_sig_fd = qemu_signalfd(&mask);
  541 + s->first_aio = NULL;
  542 + s->fd = qemu_signalfd(&mask);
525 543  
526   - fcntl(aio_sig_fd, F_SETFL, O_NONBLOCK);
  544 + fcntl(s->fd, F_SETFL, O_NONBLOCK);
527 545  
528   - qemu_set_fd_handler2(aio_sig_fd, NULL, qemu_aio_poll, NULL, NULL);
  546 + qemu_aio_set_fd_handler(s->fd, posix_aio_read, NULL, posix_aio_flush, s);
529 547  
530 548 #if defined(__GLIBC__) && defined(__linux__)
531 549 {
... ... @@ -539,39 +557,9 @@ void qemu_aio_init(void)
539 557 aio_init(&ai);
540 558 }
541 559 #endif
542   -}
543   -
544   -/* Wait for all IO requests to complete. */
545   -void qemu_aio_flush(void)
546   -{
547   - qemu_aio_poll(NULL);
548   - while (first_aio) {
549   - qemu_aio_wait();
550   - }
551   -}
552   -
553   -void qemu_aio_wait(void)
554   -{
555   - int ret;
556   -
557   - if (qemu_bh_poll())
558   - return;
559   -
560   - if (!first_aio)
561   - return;
562   -
563   - do {
564   - fd_set rdfds;
565   -
566   - FD_ZERO(&rdfds);
567   - FD_SET(aio_sig_fd, &rdfds);
  560 + posix_aio_state = s;
568 561  
569   - ret = select(aio_sig_fd + 1, &rdfds, NULL, NULL, NULL);
570   - if (ret == -1 && errno == EINTR)
571   - continue;
572   - } while (ret == 0);
573   -
574   - qemu_aio_poll(NULL);
  562 + return 0;
575 563 }
576 564  
577 565 static RawAIOCB *raw_aio_setup(BlockDriverState *bs,
... ... @@ -588,7 +576,7 @@ static RawAIOCB *raw_aio_setup(BlockDriverState *bs,
588 576 if (!acb)
589 577 return NULL;
590 578 acb->aiocb.aio_fildes = s->fd;
591   - acb->aiocb.aio_sigevent.sigev_signo = aio_sig_num;
  579 + acb->aiocb.aio_sigevent.sigev_signo = SIGUSR2;
592 580 acb->aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
593 581 acb->aiocb.aio_buf = buf;
594 582 if (nb_sectors < 0)
... ... @@ -596,8 +584,8 @@ static RawAIOCB *raw_aio_setup(BlockDriverState *bs,
596 584 else
597 585 acb->aiocb.aio_nbytes = nb_sectors * 512;
598 586 acb->aiocb.aio_offset = sector_num * 512;
599   - acb->next = first_aio;
600   - first_aio = acb;
  587 + acb->next = posix_aio_state->first_aio;
  588 + posix_aio_state->first_aio = acb;
601 589 return acb;
602 590 }
603 591  
... ... @@ -688,7 +676,7 @@ static void raw_aio_cancel(BlockDriverAIOCB *blockacb)
688 676 }
689 677  
690 678 /* remove the callback from the queue */
691   - pacb = &first_aio;
  679 + pacb = &posix_aio_state->first_aio;
692 680 for(;;) {
693 681 if (*pacb == NULL) {
694 682 break;
... ... @@ -701,21 +689,10 @@ static void raw_aio_cancel(BlockDriverAIOCB *blockacb)
701 689 }
702 690 }
703 691  
704   -# else /* CONFIG_AIO */
705   -
706   -void qemu_aio_init(void)
  692 +#else /* CONFIG_AIO */
  693 +static int posix_aio_init(void)
707 694 {
708 695 }
709   -
710   -void qemu_aio_flush(void)
711   -{
712   -}
713   -
714   -void qemu_aio_wait(void)
715   -{
716   - qemu_bh_poll();
717   -}
718   -
719 696 #endif /* CONFIG_AIO */
720 697  
721 698 static void raw_close(BlockDriverState *bs)
... ... @@ -921,6 +898,8 @@ static int hdev_open(BlockDriverState *bs, const char *filename, int flags)
921 898 BDRVRawState *s = bs->opaque;
922 899 int fd, open_flags, ret;
923 900  
  901 + posix_aio_init();
  902 +
924 903 #ifdef CONFIG_COCOA
925 904 if (strstart(filename, "/dev/cdrom", NULL)) {
926 905 kern_return_t kernResult;
... ...
block-raw-win32.c
... ... @@ -339,19 +339,6 @@ static int raw_create(const char *filename, int64_t total_size,
339 339 return 0;
340 340 }
341 341  
342   -void qemu_aio_init(void)
343   -{
344   -}
345   -
346   -void qemu_aio_flush(void)
347   -{
348   -}
349   -
350   -void qemu_aio_wait(void)
351   -{
352   - qemu_bh_poll();
353   -}
354   -
355 342 BlockDriver bdrv_raw = {
356 343 "raw",
357 344 sizeof(BDRVRawState),
... ...
... ... @@ -1310,8 +1310,6 @@ void bdrv_init(void)
1310 1310 bdrv_register(&bdrv_qcow2);
1311 1311 bdrv_register(&bdrv_parallels);
1312 1312 bdrv_register(&bdrv_nbd);
1313   -
1314   - qemu_aio_init();
1315 1313 }
1316 1314  
1317 1315 void *qemu_aio_get(BlockDriverState *bs, BlockDriverCompletionFunc *cb,
... ...
1 1 #ifndef BLOCK_H
2 2 #define BLOCK_H
3 3  
  4 +#include "qemu-aio.h"
  5 +
4 6 /* block.c */
5 7 typedef struct BlockDriver BlockDriver;
6 8  
... ... @@ -87,10 +89,6 @@ BlockDriverAIOCB *bdrv_aio_write(BlockDriverState *bs, int64_t sector_num,
87 89 BlockDriverCompletionFunc *cb, void *opaque);
88 90 void bdrv_aio_cancel(BlockDriverAIOCB *acb);
89 91  
90   -void qemu_aio_init(void);
91   -void qemu_aio_flush(void);
92   -void qemu_aio_wait(void);
93   -
94 92 int qemu_key_check(BlockDriverState *bs, const char *name);
95 93  
96 94 /* Ensure contents are flushed to disk. */
... ...
qemu-aio.h 0 → 100644
  1 +/*
  2 + * QEMU aio implementation
  3 + *
  4 + * Copyright IBM, Corp. 2008
  5 + *
  6 + * Authors:
  7 + * Anthony Liguori <aliguori@us.ibm.com>
  8 + *
  9 + * This work is licensed under the terms of the GNU GPL, version 2. See
  10 + * the COPYING file in the top-level directory.
  11 + *
  12 + */
  13 +
  14 +#ifndef QEMU_AIO_H
  15 +#define QEMU_AIO_H
  16 +
  17 +#include "qemu-common.h"
  18 +#include "qemu-char.h"
  19 +
  20 +/* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
  21 +typedef int (AioFlushHandler)(void *opaque);
  22 +
  23 +/* Flush any pending AIO operation. This function will block until all
  24 + * outstanding AIO operations have been completed or cancelled. */
  25 +void qemu_aio_flush(void);
  26 +
  27 +/* Wait for a single AIO completion to occur. This function will until a
  28 + * single AIO opeartion has completed. It is intended to be used as a looping
  29 + * primative when simulating synchronous IO based on asynchronous IO. */
  30 +void qemu_aio_wait(void);
  31 +
  32 +/* Register a file descriptor and associated callbacks. Behaves very similarly
  33 + * to qemu_set_fd_handler2. Unlike qemu_set_fd_handler2, these callbacks will
  34 + * be invoked when using either qemu_aio_wait() or qemu_aio_flush().
  35 + *
  36 + * Code that invokes AIO completion functions should rely on this function
  37 + * instead of qemu_set_fd_handler[2].
  38 + */
  39 +int qemu_aio_set_fd_handler(int fd,
  40 + IOHandler *io_read,
  41 + IOHandler *io_write,
  42 + AioFlushHandler *io_flush,
  43 + void *opaque);
  44 +
  45 +#endif
... ...