diff options
| author | 2023-02-07 13:31:48 +0100 | |
|---|---|---|
| committer | 2023-02-07 13:31:48 +0100 | |
| commit | 4cca157641d5f91bde51baf437a3179e39d0b601 (patch) | |
| tree | 84e6cf02972ea3836fbd7e92f89d69f47ccd88b7 /activitypub.c | |
| parent | Log unexpected q_item types. (diff) | |
| download | snac2-4cca157641d5f91bde51baf437a3179e39d0b601.tar.gz snac2-4cca157641d5f91bde51baf437a3179e39d0b601.tar.xz snac2-4cca157641d5f91bde51baf437a3179e39d0b601.zip | |
Output messages are now processed by the pool of threads.
Diffstat (limited to 'activitypub.c')
| -rw-r--r-- | activitypub.c | 38 |
1 files changed, 38 insertions, 0 deletions
diff --git a/activitypub.c b/activitypub.c index af0a9a1..b54845d 100644 --- a/activitypub.c +++ b/activitypub.c | |||
| @@ -1195,6 +1195,44 @@ void process_queue_item(xs_dict *q_item) | |||
| 1195 | char *type = xs_dict_get(q_item, "type"); | 1195 | char *type = xs_dict_get(q_item, "type"); |
| 1196 | int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); | 1196 | int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); |
| 1197 | 1197 | ||
| 1198 | if (strcmp(type, "output") == 0) { | ||
| 1199 | int status; | ||
| 1200 | xs_str *inbox = xs_dict_get(q_item, "inbox"); | ||
| 1201 | xs_str *keyid = xs_dict_get(q_item, "keyid"); | ||
| 1202 | xs_str *seckey = xs_dict_get(q_item, "seckey"); | ||
| 1203 | xs_dict *msg = xs_dict_get(q_item, "message"); | ||
| 1204 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | ||
| 1205 | xs *payload = NULL; | ||
| 1206 | int p_size = 0; | ||
| 1207 | |||
| 1208 | if (xs_is_null(inbox) || xs_is_null(msg) || xs_is_null(keyid) || xs_is_null(seckey)) { | ||
| 1209 | srv_log(xs_fmt("output message error: missing fields")); | ||
| 1210 | return; | ||
| 1211 | } | ||
| 1212 | |||
| 1213 | /* deliver */ | ||
| 1214 | status = send_to_inbox_raw(keyid, seckey, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8); | ||
| 1215 | |||
| 1216 | srv_log(xs_fmt("output message: sent to inbox %s %d", inbox, status)); | ||
| 1217 | |||
| 1218 | if (!valid_status(status)) { | ||
| 1219 | retries++; | ||
| 1220 | |||
| 1221 | /* error sending; requeue? */ | ||
| 1222 | if (status == 404 || status == 410) | ||
| 1223 | /* explicit error: discard */ | ||
| 1224 | srv_log(xs_fmt("output message: fatal error %s %d", inbox, status)); | ||
| 1225 | else | ||
| 1226 | if (retries > queue_retry_max) | ||
| 1227 | srv_log(xs_fmt("output message: giving up %s %d", inbox, status)); | ||
| 1228 | else { | ||
| 1229 | /* requeue */ | ||
| 1230 | enqueue_output_raw(keyid, seckey, msg, inbox, retries); | ||
| 1231 | srv_log(xs_fmt("output message: requeue %s #%d", inbox, retries)); | ||
| 1232 | } | ||
| 1233 | } | ||
| 1234 | } | ||
| 1235 | else | ||
| 1198 | if (strcmp(type, "email") == 0) { | 1236 | if (strcmp(type, "email") == 0) { |
| 1199 | /* send this email */ | 1237 | /* send this email */ |
| 1200 | xs_str *msg = xs_dict_get(q_item, "message"); | 1238 | xs_str *msg = xs_dict_get(q_item, "message"); |