summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar default2023-03-02 12:38:02 +0100
committerGravatar default2023-03-02 12:38:02 +0100
commit5036cb5e1134d99c967f0de5057801a9b0af96d9 (patch)
treef2bb1aa9cd47ddce5d99c93ee3b518b76986a9e0
parentShow a bigger piece of a connection error. (diff)
downloadsnac2-5036cb5e1134d99c967f0de5057801a9b0af96d9.tar.gz
snac2-5036cb5e1134d99c967f0de5057801a9b0af96d9.tar.xz
snac2-5036cb5e1134d99c967f0de5057801a9b0af96d9.zip
Connection jobs are treated as urgent.
-rw-r--r--activitypub.c2
-rw-r--r--data.c2
-rw-r--r--httpd.c16
-rw-r--r--snac.h2
4 files changed, 13 insertions, 9 deletions
diff --git a/activitypub.c b/activitypub.c
index ee31ecd..9c47590 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -1316,7 +1316,7 @@ int process_queue(void)
1316 xs *q_item = dequeue(fn); 1316 xs *q_item = dequeue(fn);
1317 1317
1318 if (q_item != NULL) { 1318 if (q_item != NULL) {
1319 job_post(q_item); 1319 job_post(q_item, 0);
1320 cnt++; 1320 cnt++;
1321 } 1321 }
1322 } 1322 }
diff --git a/data.c b/data.c
index 4403bf4..71ebf68 100644
--- a/data.c
+++ b/data.c
@@ -1497,7 +1497,7 @@ void enqueue_output_raw(const char *keyid, const char *seckey,
1497 1497
1498 /* if it's to be sent right now, bypass the disk queue and post the job */ 1498 /* if it's to be sent right now, bypass the disk queue and post the job */
1499 if (retries == 0 && job_fifo_ready()) 1499 if (retries == 0 && job_fifo_ready())
1500 job_post(qmsg); 1500 job_post(qmsg, 0);
1501 else { 1501 else {
1502 qmsg = _enqueue_put(fn, qmsg); 1502 qmsg = _enqueue_put(fn, qmsg);
1503 srv_debug(1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries)); 1503 srv_debug(1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries));
diff --git a/httpd.c b/httpd.c
index 1d91b34..d32903e 100644
--- a/httpd.c
+++ b/httpd.c
@@ -262,7 +262,7 @@ int job_fifo_ready(void)
262} 262}
263 263
264 264
265void job_post(const xs_val *job) 265void job_post(const xs_val *job, int urgent)
266/* posts a job for the threads to process it */ 266/* posts a job for the threads to process it */
267{ 267{
268 if (job != NULL) { 268 if (job != NULL) {
@@ -270,8 +270,12 @@ void job_post(const xs_val *job)
270 pthread_mutex_lock(&job_mutex); 270 pthread_mutex_lock(&job_mutex);
271 271
272 /* add to the fifo */ 272 /* add to the fifo */
273 if (job_fifo != NULL) 273 if (job_fifo != NULL) {
274 job_fifo = xs_list_append(job_fifo, job); 274 if (urgent)
275 job_fifo = xs_list_insert(job_fifo, 0, job);
276 else
277 job_fifo = xs_list_append(job_fifo, job);
278 }
275 279
276 /* unlock the mutex */ 280 /* unlock the mutex */
277 pthread_mutex_unlock(&job_mutex); 281 pthread_mutex_unlock(&job_mutex);
@@ -386,7 +390,7 @@ static void *background_thread(void *arg)
386 390
387 xs *q_item = xs_dict_new(); 391 xs *q_item = xs_dict_new();
388 q_item = xs_dict_append(q_item, "type", "purge"); 392 q_item = xs_dict_append(q_item, "type", "purge");
389 job_post(q_item); 393 job_post(q_item, 0);
390 } 394 }
391 395
392 if (cnt == 0) { 396 if (cnt == 0) {
@@ -485,7 +489,7 @@ void httpd(void)
485 489
486 if (f != NULL) { 490 if (f != NULL) {
487 xs *job = xs_data_new(&f, sizeof(FILE *)); 491 xs *job = xs_data_new(&f, sizeof(FILE *));
488 job_post(job); 492 job_post(job, 1);
489 } 493 }
490 else 494 else
491 break; 495 break;
@@ -496,7 +500,7 @@ void httpd(void)
496 500
497 /* send as many empty jobs as working threads */ 501 /* send as many empty jobs as working threads */
498 for (n = 1; n < n_threads; n++) 502 for (n = 1; n < n_threads; n++)
499 job_post(NULL); 503 job_post(NULL, 0);
500 504
501 /* wait for all the threads to exit */ 505 /* wait for all the threads to exit */
502 for (n = 0; n < n_threads; n++) 506 for (n = 0; n < n_threads; n++)
diff --git a/snac.h b/snac.h
index 9ea3619..6766c55 100644
--- a/snac.h
+++ b/snac.h
@@ -218,5 +218,5 @@ int adduser(const char *uid);
218int resetpwd(snac *snac); 218int resetpwd(snac *snac);
219 219
220int job_fifo_ready(void); 220int job_fifo_ready(void);
221void job_post(const xs_val *job); 221void job_post(const xs_val *job, int urgent);
222void job_wait(xs_val **job); 222void job_wait(xs_val **job);