diff options
| author | 2023-02-02 05:21:16 +0100 | |
|---|---|---|
| committer | 2023-02-02 05:21:16 +0100 | |
| commit | 928f22fbbaec5d52836b217ec7a281b880b23f92 (patch) | |
| tree | 07990b389795025cbd184dfc79d8807ae96d9a5a | |
| parent | New function queue() (the global queue). (diff) | |
| download | snac2-928f22fbbaec5d52836b217ec7a281b880b23f92.tar.gz snac2-928f22fbbaec5d52836b217ec7a281b880b23f92.tar.xz snac2-928f22fbbaec5d52836b217ec7a281b880b23f92.zip | |
Email notifications have been moved to the global queue.
| -rw-r--r-- | activitypub.c | 46 | ||||
| -rw-r--r-- | data.c | 6 | ||||
| -rw-r--r-- | httpd.c | 3 | ||||
| -rw-r--r-- | snac.h | 7 |
4 files changed, 37 insertions, 25 deletions
diff --git a/activitypub.c b/activitypub.c index 5cc059d..9c9ea7c 100644 --- a/activitypub.c +++ b/activitypub.c | |||
| @@ -839,7 +839,7 @@ void notify(snac *snac, char *type, char *utype, char *actor, char *msg) | |||
| 839 | body = xs_str_cat(body, s1); | 839 | body = xs_str_cat(body, s1); |
| 840 | } | 840 | } |
| 841 | 841 | ||
| 842 | enqueue_email(snac, body, 0); | 842 | enqueue_email(body, 0); |
| 843 | } | 843 | } |
| 844 | 844 | ||
| 845 | 845 | ||
| @@ -1137,26 +1137,6 @@ void process_user_queue_item(snac *snac, xs_dict *q_item) | |||
| 1137 | } | 1137 | } |
| 1138 | } | 1138 | } |
| 1139 | } | 1139 | } |
| 1140 | else | ||
| 1141 | if (strcmp(type, "email") == 0) { | ||
| 1142 | /* send this email */ | ||
| 1143 | xs_str *msg = xs_dict_get(q_item, "message"); | ||
| 1144 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | ||
| 1145 | |||
| 1146 | if (!send_email(msg)) | ||
| 1147 | snac_debug(snac, 1, xs_fmt("email message sent")); | ||
| 1148 | else { | ||
| 1149 | if (retries > queue_retry_max) | ||
| 1150 | snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno)); | ||
| 1151 | else { | ||
| 1152 | /* requeue */ | ||
| 1153 | snac_log(snac, xs_fmt( | ||
| 1154 | "process_queue email requeue #%d (errno: %d)", retries + 1, errno)); | ||
| 1155 | |||
| 1156 | enqueue_email(snac, msg, retries + 1); | ||
| 1157 | } | ||
| 1158 | } | ||
| 1159 | } | ||
| 1160 | } | 1140 | } |
| 1161 | 1141 | ||
| 1162 | 1142 | ||
| @@ -1184,6 +1164,30 @@ void process_user_queue(snac *snac) | |||
| 1184 | void process_queue_item(xs_dict *q_item) | 1164 | void process_queue_item(xs_dict *q_item) |
| 1185 | /* processes an item from the global queue */ | 1165 | /* processes an item from the global queue */ |
| 1186 | { | 1166 | { |
| 1167 | char *type = xs_dict_get(q_item, "type"); | ||
| 1168 | int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); | ||
| 1169 | |||
| 1170 | if (strcmp(type, "email") == 0) { | ||
| 1171 | /* send this email */ | ||
| 1172 | xs_str *msg = xs_dict_get(q_item, "message"); | ||
| 1173 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | ||
| 1174 | |||
| 1175 | if (!send_email(msg)) | ||
| 1176 | srv_debug(1, xs_fmt("email message sent")); | ||
| 1177 | else { | ||
| 1178 | retries++; | ||
| 1179 | |||
| 1180 | if (retries > queue_retry_max) | ||
| 1181 | srv_log(xs_fmt("process_queue email giving up (errno: %d)", errno)); | ||
| 1182 | else { | ||
| 1183 | /* requeue */ | ||
| 1184 | srv_log(xs_fmt( | ||
| 1185 | "process_queue email requeue #%d (errno: %d)", retries, errno)); | ||
| 1186 | |||
| 1187 | enqueue_email(msg, retries); | ||
| 1188 | } | ||
| 1189 | } | ||
| 1190 | } | ||
| 1187 | } | 1191 | } |
| 1188 | 1192 | ||
| 1189 | 1193 | ||
| @@ -1389,16 +1389,16 @@ void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retrie | |||
| 1389 | } | 1389 | } |
| 1390 | 1390 | ||
| 1391 | 1391 | ||
| 1392 | void enqueue_email(snac *snac, xs_str *msg, int retries) | 1392 | void enqueue_email(xs_str *msg, int retries) |
| 1393 | /* enqueues an email message to be sent */ | 1393 | /* enqueues an email message to be sent */ |
| 1394 | { | 1394 | { |
| 1395 | xs *qmsg = _new_qmsg("email", msg, retries); | 1395 | xs *qmsg = _new_qmsg("email", msg, retries); |
| 1396 | char *ntid = xs_dict_get(qmsg, "ntid"); | 1396 | char *ntid = xs_dict_get(qmsg, "ntid"); |
| 1397 | xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); | 1397 | xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid); |
| 1398 | 1398 | ||
| 1399 | qmsg = _enqueue_put(fn, qmsg); | 1399 | qmsg = _enqueue_put(fn, qmsg); |
| 1400 | 1400 | ||
| 1401 | snac_debug(snac, 1, xs_fmt("enqueue_email %d", retries)); | 1401 | srv_debug(1, xs_fmt("enqueue_email %d", retries)); |
| 1402 | } | 1402 | } |
| 1403 | 1403 | ||
| 1404 | 1404 | ||
| @@ -280,6 +280,9 @@ static void *queue_thread(void *arg) | |||
| 280 | } | 280 | } |
| 281 | } | 281 | } |
| 282 | 282 | ||
| 283 | /* global queue */ | ||
| 284 | process_queue(); | ||
| 285 | |||
| 283 | /* time to purge? */ | 286 | /* time to purge? */ |
| 284 | if ((t = time(NULL)) > purge_time) { | 287 | if ((t = time(NULL)) > purge_time) { |
| 285 | pthread_t pth; | 288 | pthread_t pth; |
| @@ -126,10 +126,11 @@ d_char *history_list(snac *snac); | |||
| 126 | void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries); | 126 | void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries); |
| 127 | void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); | 127 | void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); |
| 128 | void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries); | 128 | void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries); |
| 129 | void enqueue_email(snac *snac, xs_str *msg, int retries); | 129 | void enqueue_email(xs_str *msg, int retries); |
| 130 | void enqueue_message(snac *snac, char *msg); | 130 | void enqueue_message(snac *snac, char *msg); |
| 131 | 131 | ||
| 132 | xs_list *user_queue(snac *snac); | 132 | xs_list *user_queue(snac *snac); |
| 133 | xs_list *queue(void); | ||
| 133 | xs_dict *dequeue(const char *fn); | 134 | xs_dict *dequeue(const char *fn); |
| 134 | 135 | ||
| 135 | void purge(snac *snac); | 136 | void purge(snac *snac); |
| @@ -165,7 +166,11 @@ int send_to_inbox(snac *snac, char *inbox, char *msg, d_char **payload, int *p_s | |||
| 165 | d_char *get_actor_inbox(snac *snac, char *actor); | 166 | d_char *get_actor_inbox(snac *snac, char *actor); |
| 166 | int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_size, int timeout); | 167 | int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_size, int timeout); |
| 167 | int is_msg_public(snac *snac, char *msg); | 168 | int is_msg_public(snac *snac, char *msg); |
| 169 | |||
| 168 | void process_user_queue(snac *snac); | 170 | void process_user_queue(snac *snac); |
| 171 | |||
| 172 | void process_queue(void); | ||
| 173 | |||
| 169 | void post(snac *snac, char *msg); | 174 | void post(snac *snac, char *msg); |
| 170 | int activitypub_get_handler(d_char *req, char *q_path, | 175 | int activitypub_get_handler(d_char *req, char *q_path, |
| 171 | char **body, int *b_size, char **ctype); | 176 | char **body, int *b_size, char **ctype); |