diff options
| author | 2023-02-06 11:29:46 +0100 | |
|---|---|---|
| committer | 2023-02-06 11:29:46 +0100 | |
| commit | 6bcc6bfa1ca871d2aae71e2b76d6eca36d24b5e5 (patch) | |
| tree | bba1fa629f92f3ee5c70a64218159f0db09b62fd | |
| parent | Backport from xs. (diff) | |
| download | snac2-6bcc6bfa1ca871d2aae71e2b76d6eca36d24b5e5.tar.gz snac2-6bcc6bfa1ca871d2aae71e2b76d6eca36d24b5e5.tar.xz snac2-6bcc6bfa1ca871d2aae71e2b76d6eca36d24b5e5.zip | |
New functions job_post() and job_wait() (untested).
| -rw-r--r-- | httpd.c | 86 |
1 files changed, 85 insertions, 1 deletions
| @@ -13,6 +13,7 @@ | |||
| 13 | 13 | ||
| 14 | #include <setjmp.h> | 14 | #include <setjmp.h> |
| 15 | #include <pthread.h> | 15 | #include <pthread.h> |
| 16 | #include <semaphore.h> | ||
| 16 | 17 | ||
| 17 | 18 | ||
| 18 | /* nodeinfo 2.0 template */ | 19 | /* nodeinfo 2.0 template */ |
| @@ -319,6 +320,80 @@ static void *connection_thread(void *arg) | |||
| 319 | } | 320 | } |
| 320 | 321 | ||
| 321 | 322 | ||
| 323 | /** job control **/ | ||
| 324 | |||
| 325 | /* mutex to access the lists of jobs */ | ||
| 326 | static pthread_mutex_t job_mutex; | ||
| 327 | |||
| 328 | /* semaphre to trigger job processing */ | ||
| 329 | static sem_t job_sem; | ||
| 330 | |||
| 331 | /* list of input sockets */ | ||
| 332 | xs_list *job_sockets = NULL; | ||
| 333 | |||
| 334 | /* list of queue items */ | ||
| 335 | xs_list *job_qitems = NULL; | ||
| 336 | |||
| 337 | |||
| 338 | void job_post(FILE *socket, const xs_dict *q_item) | ||
| 339 | /* posts a job, being an input connection or another queue item */ | ||
| 340 | { | ||
| 341 | /* lock the mutex */ | ||
| 342 | pthread_mutex_lock(&job_mutex); | ||
| 343 | |||
| 344 | /* add to the appropriate fifo */ | ||
| 345 | if (socket != NULL) { | ||
| 346 | xs *d = xs_data_new(&socket, sizeof(FILE *)); | ||
| 347 | job_sockets = xs_list_append(job_sockets, d); | ||
| 348 | } | ||
| 349 | else | ||
| 350 | if (q_item != NULL) | ||
| 351 | job_qitems = xs_list_append(job_qitems, q_item); | ||
| 352 | |||
| 353 | /* unlock the mutex */ | ||
| 354 | pthread_mutex_unlock(&job_mutex); | ||
| 355 | |||
| 356 | /* ask for someone to attend it */ | ||
| 357 | sem_post(&job_sem); | ||
| 358 | } | ||
| 359 | |||
| 360 | |||
| 361 | int job_wait(FILE **socket, xs_dict **q_item) | ||
| 362 | /* waits for an available job; returns 0 if nothing left to do */ | ||
| 363 | { | ||
| 364 | int done = 1; | ||
| 365 | |||
| 366 | *socket = NULL; | ||
| 367 | *q_item = NULL; | ||
| 368 | |||
| 369 | if (sem_wait(&job_sem) == 0) { | ||
| 370 | /* lock the mutex */ | ||
| 371 | pthread_mutex_lock(&job_mutex); | ||
| 372 | |||
| 373 | /* try first to get a socket to process */ | ||
| 374 | xs *job_socket = NULL; | ||
| 375 | job_sockets = xs_list_shift(job_sockets, &job_socket); | ||
| 376 | |||
| 377 | /* if empty, try a q_item */ | ||
| 378 | if (job_socket == NULL) | ||
| 379 | job_qitems = xs_list_shift(job_qitems, q_item); | ||
| 380 | |||
| 381 | /* unlock the mutex */ | ||
| 382 | pthread_mutex_unlock(&job_mutex); | ||
| 383 | |||
| 384 | if (job_socket != NULL) { | ||
| 385 | xs_data_get(job_socket, socket); | ||
| 386 | done = 0; | ||
| 387 | } | ||
| 388 | else | ||
| 389 | if (*q_item != NULL) | ||
| 390 | done = 0; | ||
| 391 | } | ||
| 392 | |||
| 393 | return done; | ||
| 394 | } | ||
| 395 | |||
| 396 | |||
| 322 | #ifndef MAX_THREADS | 397 | #ifndef MAX_THREADS |
| 323 | #define MAX_THREADS 256 | 398 | #define MAX_THREADS 256 |
| 324 | #endif | 399 | #endif |
| @@ -348,8 +423,14 @@ void httpd(void) | |||
| 348 | 423 | ||
| 349 | srv_log(xs_fmt("httpd start %s:%d %s", address, port, USER_AGENT)); | 424 | srv_log(xs_fmt("httpd start %s:%d %s", address, port, USER_AGENT)); |
| 350 | 425 | ||
| 351 | /* thread creation */ | 426 | /* initialize the job control engine */ |
| 427 | pthread_mutex_init(&job_mutex, NULL); | ||
| 428 | sem_init(&job_sem, 0, 0); | ||
| 429 | job_sockets = xs_list_new(); | ||
| 430 | job_qitems = xs_list_new(); | ||
| 431 | |||
| 352 | #ifdef _SC_NPROCESSORS_ONLN | 432 | #ifdef _SC_NPROCESSORS_ONLN |
| 433 | /* get number of CPUs on the machine */ | ||
| 353 | n_threads = sysconf(_SC_NPROCESSORS_ONLN); | 434 | n_threads = sysconf(_SC_NPROCESSORS_ONLN); |
| 354 | #endif | 435 | #endif |
| 355 | 436 | ||
| @@ -380,5 +461,8 @@ void httpd(void) | |||
| 380 | /* wait for the background thread to end */ | 461 | /* wait for the background thread to end */ |
| 381 | pthread_join(threads[0], NULL); | 462 | pthread_join(threads[0], NULL); |
| 382 | 463 | ||
| 464 | job_sockets = xs_free(job_sockets); | ||
| 465 | job_qitems = xs_free(job_qitems); | ||
| 466 | |||
| 383 | srv_log(xs_fmt("httpd stop %s:%d", address, port)); | 467 | srv_log(xs_fmt("httpd stop %s:%d", address, port)); |
| 384 | } | 468 | } |