async.c 10.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
/*
 * Server-side async I/O support
 *
 * Copyright (C) 2007 Alexandre Julliard
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
 */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>

#include "ntstatus.h"
#define WIN32_NO_STATUS
#include "windef.h"
#include "winternl.h"

#include "object.h"
#include "file.h"
#include "request.h"

struct async
{
    struct object        obj;             /* object header */
    struct thread       *thread;          /* owning thread */
39 40
    struct list          queue_entry;     /* entry in async queue list */
    struct async_queue  *queue;           /* queue containing this async */
41
    unsigned int         status;          /* current status */
42
    struct timeout_user *timeout;
43
    unsigned int         timeout_status;  /* status to report upon timeout */
44
    struct event        *event;
45
    struct completion   *completion;
46
    apc_param_t          comp_key;
47
    async_data_t         data;            /* data for async I/O call */
48 49 50 51 52 53 54 55 56
};

static void async_dump( struct object *obj, int verbose );
static void async_destroy( struct object *obj );

static const struct object_ops async_ops =
{
    sizeof(struct async),      /* size */
    async_dump,                /* dump */
57
    no_get_type,               /* get_type */
58 59 60 61 62 63 64
    no_add_queue,              /* add_queue */
    NULL,                      /* remove_queue */
    NULL,                      /* signaled */
    NULL,                      /* satisfied */
    no_signal,                 /* signal */
    no_get_fd,                 /* get_fd */
    no_map_access,             /* map_access */
65 66
    default_get_sd,            /* get_sd */
    default_set_sd,            /* set_sd */
67
    no_lookup_name,            /* lookup_name */
68
    no_open_file,              /* open_file */
69 70 71 72
    no_close_handle,           /* close_handle */
    async_destroy              /* destroy */
};

73 74 75 76 77 78 79 80 81 82 83 84 85 86

struct async_queue
{
    struct object        obj;             /* object header */
    struct fd           *fd;              /* file descriptor owning this queue */
    struct list          queue;           /* queue of async objects */
};

static void async_queue_dump( struct object *obj, int verbose );

static const struct object_ops async_queue_ops =
{
    sizeof(struct async_queue),      /* size */
    async_queue_dump,                /* dump */
87
    no_get_type,                     /* get_type */
88 89 90 91 92 93 94
    no_add_queue,                    /* add_queue */
    NULL,                            /* remove_queue */
    NULL,                            /* signaled */
    NULL,                            /* satisfied */
    no_signal,                       /* signal */
    no_get_fd,                       /* get_fd */
    no_map_access,                   /* map_access */
95 96
    default_get_sd,                  /* get_sd */
    default_set_sd,                  /* set_sd */
97 98 99
    no_lookup_name,                  /* lookup_name */
    no_open_file,                    /* open_file */
    no_close_handle,                 /* close_handle */
100
    no_destroy                       /* destroy */
101 102 103
};


104 105 106 107 108
static inline void async_reselect( struct async *async )
{
    if (async->queue->fd) fd_reselect_async( async->queue->fd, async->queue );
}

109 110 111 112 113 114 115 116 117 118 119 120
static void async_dump( struct object *obj, int verbose )
{
    struct async *async = (struct async *)obj;
    assert( obj->ops == &async_ops );
    fprintf( stderr, "Async thread=%p\n", async->thread );
}

static void async_destroy( struct object *obj )
{
    struct async *async = (struct async *)obj;
    assert( obj->ops == &async_ops );

121 122 123
    list_remove( &async->queue_entry );
    async_reselect( async );

124
    if (async->timeout) remove_timeout_user( async->timeout );
125
    if (async->event) release_object( async->event );
126
    if (async->completion) release_object( async->completion );
127
    release_object( async->queue );
128 129 130
    release_object( async->thread );
}

131 132 133 134 135 136 137
static void async_queue_dump( struct object *obj, int verbose )
{
    struct async_queue *async_queue = (struct async_queue *)obj;
    assert( obj->ops == &async_queue_ops );
    fprintf( stderr, "Async queue fd=%p\n", async_queue->fd );
}

138
/* notifies client thread of new status of its async request */
139
void async_terminate( struct async *async, unsigned int status )
140 141 142
{
    apc_call_t data;

143 144 145 146 147 148 149 150 151
    assert( status != STATUS_PENDING );

    if (async->status != STATUS_PENDING)
    {
        /* already terminated, just update status */
        async->status = status;
        return;
    }

152 153
    memset( &data, 0, sizeof(data) );
    data.type            = APC_ASYNC_IO;
154 155 156
    data.async_io.func   = async->data.callback;
    data.async_io.user   = async->data.arg;
    data.async_io.sb     = async->data.iosb;
157 158
    data.async_io.status = status;
    thread_queue_apc( async->thread, &async->obj, &data );
159 160 161
    async->status = status;
    async_reselect( async );
    release_object( async );  /* so that it gets destroyed when the async is done */
162 163 164 165 166 167 168 169
}

/* callback for timeout on an async request */
static void async_timeout( void *private )
{
    struct async *async = private;

    async->timeout = NULL;
170
    async_terminate( async, async->timeout_status );
171 172
}

173 174 175 176 177 178 179 180 181 182 183 184 185
/* create a new async queue for a given fd */
struct async_queue *create_async_queue( struct fd *fd )
{
    struct async_queue *queue = alloc_object( &async_queue_ops );

    if (queue)
    {
        queue->fd = fd;
        list_init( &queue->queue );
    }
    return queue;
}

186 187 188 189 190 191 192 193 194
/* free an async queue, cancelling all async operations */
void free_async_queue( struct async_queue *queue )
{
    if (!queue) return;
    queue->fd = NULL;
    async_wake_up( queue, STATUS_HANDLES_CLOSED );
    release_object( queue );
}

195
/* create an async on a given queue of a fd */
196
struct async *create_async( struct thread *thread, struct async_queue *queue, const async_data_t *data )
197
{
198 199
    struct event *event = NULL;
    struct async *async;
200

201 202 203 204 205 206 207 208
    if (data->event && !(event = get_event_obj( thread->process, data->event, EVENT_MODIFY_STATE )))
        return NULL;

    if (!(async = alloc_object( &async_ops )))
    {
        if (event) release_object( event );
        return NULL;
    }
209

210 211 212 213
    async->thread  = (struct thread *)grab_object( thread );
    async->event   = event;
    async->status  = STATUS_PENDING;
    async->data    = *data;
214
    async->timeout = NULL;
215
    async->queue   = (struct async_queue *)grab_object( queue );
216
    async->completion = NULL;
217
    if (queue->fd) async->completion = fd_get_completion( queue->fd, &async->comp_key );
218

219
    list_add_tail( &queue->queue, &async->queue_entry );
220
    grab_object( async );
221

222
    if (queue->fd) set_fd_signaled( queue->fd, 0 );
223
    if (event) reset_event( event );
224 225 226
    return async;
}

227
/* set the timeout of an async operation */
228
void async_set_timeout( struct async *async, timeout_t timeout, unsigned int status )
229 230
{
    if (async->timeout) remove_timeout_user( async->timeout );
231
    if (timeout != TIMEOUT_INFINITE) async->timeout = add_timeout_user( timeout, async_timeout, async );
232
    else async->timeout = NULL;
233
    async->timeout_status = status;
234 235
}

236
/* store the result of the client-side async callback */
237
void async_set_result( struct object *obj, unsigned int status, unsigned int total, client_ptr_t apc )
238 239 240 241 242
{
    struct async *async = (struct async *)obj;

    if (obj->ops != &async_ops) return;  /* in case the client messed up the APC results */

243 244 245
    assert( async->status != STATUS_PENDING );  /* it must have been woken up if we get a result */

    if (status == STATUS_PENDING)  /* restart it */
246
    {
247 248 249 250 251 252 253 254
        status = async->status;
        async->status = STATUS_PENDING;
        grab_object( async );

        if (status != STATUS_ALERTED)  /* it was terminated in the meantime */
            async_terminate( async, status );
        else
            async_reselect( async );
255 256 257
    }
    else
    {
258 259 260
        if (async->timeout) remove_timeout_user( async->timeout );
        async->timeout = NULL;
        async->status = status;
261 262
        if (async->completion && async->data.cvalue)
            add_completion( async->completion, async->comp_key, async->data.cvalue, status, total );
263
        if (apc)
264 265
        {
            apc_call_t data;
266
            memset( &data, 0, sizeof(data) );
267
            data.type         = APC_USER;
268
            data.user.func    = apc;
269 270
            data.user.args[0] = async->data.arg;
            data.user.args[1] = async->data.iosb;
271 272 273
            data.user.args[2] = 0;
            thread_queue_apc( async->thread, NULL, &data );
        }
274
        if (async->event) set_event( async->event );
275
        else if (async->queue->fd) set_fd_signaled( async->queue->fd, 1 );
276 277 278
    }
}

279 280 281 282 283 284
/* check if there are any queued async operations */
int async_queued( struct async_queue *queue )
{
    return queue && list_head( &queue->queue );
}

285 286
/* check if an async operation is waiting to be alerted */
int async_waiting( struct async_queue *queue )
287
{
288 289 290 291 292 293 294
    struct list *ptr;
    struct async *async;

    if (!queue) return 0;
    if (!(ptr = list_head( &queue->queue ))) return 0;
    async = LIST_ENTRY( ptr, struct async, queue_entry );
    return async->status == STATUS_PENDING;
295 296
}

297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
int async_wake_up_by( struct async_queue *queue, struct process *process,
                      struct thread *thread, client_ptr_t iosb, unsigned int status )
{
    struct list *ptr, *next;
    int woken = 0;

    if (!queue || (!process && !thread && !iosb)) return 0;

    LIST_FOR_EACH_SAFE( ptr, next, &queue->queue )
    {
        struct async *async = LIST_ENTRY( ptr, struct async, queue_entry );
        if ( (!process || async->thread->process == process) &&
             (!thread || async->thread == thread) &&
             (!iosb || async->data.iosb == iosb) )
        {
            async_terminate( async, status );
            woken++;
        }
    }
    return woken;
}

319 320
/* wake up async operations on the queue */
void async_wake_up( struct async_queue *queue, unsigned int status )
321 322 323
{
    struct list *ptr, *next;

324 325 326
    if (!queue) return;

    LIST_FOR_EACH_SAFE( ptr, next, &queue->queue )
327 328 329
    {
        struct async *async = LIST_ENTRY( ptr, struct async, queue_entry );
        async_terminate( async, status );
330
        if (status == STATUS_ALERTED) break;  /* only wake up the first one */
331 332
    }
}