diff options
| author | 2023-02-07 13:31:48 +0100 | |
|---|---|---|
| committer | 2023-02-07 13:31:48 +0100 | |
| commit | 4cca157641d5f91bde51baf437a3179e39d0b601 (patch) | |
| tree | 84e6cf02972ea3836fbd7e92f89d69f47ccd88b7 | |
| parent | Log unexpected q_item types. (diff) | |
| download | snac2-4cca157641d5f91bde51baf437a3179e39d0b601.tar.gz snac2-4cca157641d5f91bde51baf437a3179e39d0b601.tar.xz snac2-4cca157641d5f91bde51baf437a3179e39d0b601.zip | |
Output messages are now processed by the pool of threads.
| -rw-r--r-- | activitypub.c | 38 | ||||
| -rw-r--r-- | data.c | 30 | ||||
| -rw-r--r-- | snac.h | 2 |
3 files changed, 60 insertions, 10 deletions
diff --git a/activitypub.c b/activitypub.c index af0a9a1..b54845d 100644 --- a/activitypub.c +++ b/activitypub.c | |||
| @@ -1195,6 +1195,44 @@ void process_queue_item(xs_dict *q_item) | |||
| 1195 | char *type = xs_dict_get(q_item, "type"); | 1195 | char *type = xs_dict_get(q_item, "type"); |
| 1196 | int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); | 1196 | int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); |
| 1197 | 1197 | ||
| 1198 | if (strcmp(type, "output") == 0) { | ||
| 1199 | int status; | ||
| 1200 | xs_str *inbox = xs_dict_get(q_item, "inbox"); | ||
| 1201 | xs_str *keyid = xs_dict_get(q_item, "keyid"); | ||
| 1202 | xs_str *seckey = xs_dict_get(q_item, "seckey"); | ||
| 1203 | xs_dict *msg = xs_dict_get(q_item, "message"); | ||
| 1204 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | ||
| 1205 | xs *payload = NULL; | ||
| 1206 | int p_size = 0; | ||
| 1207 | |||
| 1208 | if (xs_is_null(inbox) || xs_is_null(msg) || xs_is_null(keyid) || xs_is_null(seckey)) { | ||
| 1209 | srv_log(xs_fmt("output message error: missing fields")); | ||
| 1210 | return; | ||
| 1211 | } | ||
| 1212 | |||
| 1213 | /* deliver */ | ||
| 1214 | status = send_to_inbox_raw(keyid, seckey, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8); | ||
| 1215 | |||
| 1216 | srv_log(xs_fmt("output message: sent to inbox %s %d", inbox, status)); | ||
| 1217 | |||
| 1218 | if (!valid_status(status)) { | ||
| 1219 | retries++; | ||
| 1220 | |||
| 1221 | /* error sending; requeue? */ | ||
| 1222 | if (status == 404 || status == 410) | ||
| 1223 | /* explicit error: discard */ | ||
| 1224 | srv_log(xs_fmt("output message: fatal error %s %d", inbox, status)); | ||
| 1225 | else | ||
| 1226 | if (retries > queue_retry_max) | ||
| 1227 | srv_log(xs_fmt("output message: giving up %s %d", inbox, status)); | ||
| 1228 | else { | ||
| 1229 | /* requeue */ | ||
| 1230 | enqueue_output_raw(keyid, seckey, msg, inbox, retries); | ||
| 1231 | srv_log(xs_fmt("output message: requeue %s #%d", inbox, retries)); | ||
| 1232 | } | ||
| 1233 | } | ||
| 1234 | } | ||
| 1235 | else | ||
| 1198 | if (strcmp(type, "email") == 0) { | 1236 | if (strcmp(type, "email") == 0) { |
| 1199 | /* send this email */ | 1237 | /* send this email */ |
| 1200 | xs_str *msg = xs_dict_get(q_item, "message"); | 1238 | xs_str *msg = xs_dict_get(q_item, "message"); |
| @@ -1373,25 +1373,35 @@ void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries) | |||
| 1373 | } | 1373 | } |
| 1374 | 1374 | ||
| 1375 | 1375 | ||
| 1376 | void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries) | 1376 | void enqueue_output_raw(const char *keyid, const char *seckey, |
| 1377 | xs_dict *msg, xs_str *inbox, int retries) | ||
| 1377 | /* enqueues an output message to an inbox */ | 1378 | /* enqueues an output message to an inbox */ |
| 1378 | { | 1379 | { |
| 1379 | if (xs_startswith(inbox, snac->actor)) { | ||
| 1380 | snac_debug(snac, 1, xs_str_new("refusing enqueue to myself")); | ||
| 1381 | return; | ||
| 1382 | } | ||
| 1383 | |||
| 1384 | xs *qmsg = _new_qmsg("output", msg, retries); | 1380 | xs *qmsg = _new_qmsg("output", msg, retries); |
| 1385 | char *ntid = xs_dict_get(qmsg, "ntid"); | 1381 | char *ntid = xs_dict_get(qmsg, "ntid"); |
| 1386 | xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); | 1382 | xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid); |
| 1387 | 1383 | ||
| 1388 | qmsg = xs_dict_append(qmsg, "inbox", inbox); | 1384 | qmsg = xs_dict_append(qmsg, "inbox", inbox); |
| 1389 | qmsg = xs_dict_append(qmsg, "keyid", snac->actor); | 1385 | qmsg = xs_dict_append(qmsg, "keyid", keyid); |
| 1390 | qmsg = xs_dict_append(qmsg, "seckey", xs_dict_get(snac->key, "secret")); | 1386 | qmsg = xs_dict_append(qmsg, "seckey", seckey); |
| 1391 | 1387 | ||
| 1392 | qmsg = _enqueue_put(fn, qmsg); | 1388 | qmsg = _enqueue_put(fn, qmsg); |
| 1393 | 1389 | ||
| 1394 | snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries)); | 1390 | srv_debug(1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries)); |
| 1391 | } | ||
| 1392 | |||
| 1393 | |||
| 1394 | void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries) | ||
| 1395 | /* enqueues an output message to an inbox */ | ||
| 1396 | { | ||
| 1397 | if (xs_startswith(inbox, snac->actor)) { | ||
| 1398 | snac_debug(snac, 1, xs_str_new("refusing enqueue to myself")); | ||
| 1399 | return; | ||
| 1400 | } | ||
| 1401 | |||
| 1402 | char *seckey = xs_dict_get(snac->key, "secret"); | ||
| 1403 | |||
| 1404 | enqueue_output_raw(snac->actor, seckey, msg, inbox, retries); | ||
| 1395 | } | 1405 | } |
| 1396 | 1406 | ||
| 1397 | 1407 | ||
| @@ -129,6 +129,8 @@ int history_del(snac *snac, char *id); | |||
| 129 | d_char *history_list(snac *snac); | 129 | d_char *history_list(snac *snac); |
| 130 | 130 | ||
| 131 | void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries); | 131 | void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries); |
| 132 | void enqueue_output_raw(const char *keyid, const char *seckey, | ||
| 133 | xs_dict *msg, xs_str *inbox, int retries); | ||
| 132 | void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); | 134 | void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); |
| 133 | void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries); | 135 | void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries); |
| 134 | void enqueue_email(xs_str *msg, int retries); | 136 | void enqueue_email(xs_str *msg, int retries); |