summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar default2023-02-02 05:21:16 +0100
committerGravatar default2023-02-02 05:21:16 +0100
commit928f22fbbaec5d52836b217ec7a281b880b23f92 (patch)
tree07990b389795025cbd184dfc79d8807ae96d9a5a
parentNew function queue() (the global queue). (diff)
downloadsnac2-928f22fbbaec5d52836b217ec7a281b880b23f92.tar.gz
snac2-928f22fbbaec5d52836b217ec7a281b880b23f92.tar.xz
snac2-928f22fbbaec5d52836b217ec7a281b880b23f92.zip
Email notifications have been moved to the global queue.
-rw-r--r--activitypub.c46
-rw-r--r--data.c6
-rw-r--r--httpd.c3
-rw-r--r--snac.h7
4 files changed, 37 insertions, 25 deletions
diff --git a/activitypub.c b/activitypub.c
index 5cc059d..9c9ea7c 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -839,7 +839,7 @@ void notify(snac *snac, char *type, char *utype, char *actor, char *msg)
839 body = xs_str_cat(body, s1); 839 body = xs_str_cat(body, s1);
840 } 840 }
841 841
842 enqueue_email(snac, body, 0); 842 enqueue_email(body, 0);
843} 843}
844 844
845 845
@@ -1137,26 +1137,6 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
1137 } 1137 }
1138 } 1138 }
1139 } 1139 }
1140 else
1141 if (strcmp(type, "email") == 0) {
1142 /* send this email */
1143 xs_str *msg = xs_dict_get(q_item, "message");
1144 int retries = xs_number_get(xs_dict_get(q_item, "retries"));
1145
1146 if (!send_email(msg))
1147 snac_debug(snac, 1, xs_fmt("email message sent"));
1148 else {
1149 if (retries > queue_retry_max)
1150 snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno));
1151 else {
1152 /* requeue */
1153 snac_log(snac, xs_fmt(
1154 "process_queue email requeue #%d (errno: %d)", retries + 1, errno));
1155
1156 enqueue_email(snac, msg, retries + 1);
1157 }
1158 }
1159 }
1160} 1140}
1161 1141
1162 1142
@@ -1184,6 +1164,30 @@ void process_user_queue(snac *snac)
1184void process_queue_item(xs_dict *q_item) 1164void process_queue_item(xs_dict *q_item)
1185/* processes an item from the global queue */ 1165/* processes an item from the global queue */
1186{ 1166{
1167 char *type = xs_dict_get(q_item, "type");
1168 int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
1169
1170 if (strcmp(type, "email") == 0) {
1171 /* send this email */
1172 xs_str *msg = xs_dict_get(q_item, "message");
1173 int retries = xs_number_get(xs_dict_get(q_item, "retries"));
1174
1175 if (!send_email(msg))
1176 srv_debug(1, xs_fmt("email message sent"));
1177 else {
1178 retries++;
1179
1180 if (retries > queue_retry_max)
1181 srv_log(xs_fmt("process_queue email giving up (errno: %d)", errno));
1182 else {
1183 /* requeue */
1184 srv_log(xs_fmt(
1185 "process_queue email requeue #%d (errno: %d)", retries, errno));
1186
1187 enqueue_email(msg, retries);
1188 }
1189 }
1190 }
1187} 1191}
1188 1192
1189 1193
diff --git a/data.c b/data.c
index 96583aa..da8d422 100644
--- a/data.c
+++ b/data.c
@@ -1389,16 +1389,16 @@ void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retrie
1389} 1389}
1390 1390
1391 1391
1392void enqueue_email(snac *snac, xs_str *msg, int retries) 1392void enqueue_email(xs_str *msg, int retries)
1393/* enqueues an email message to be sent */ 1393/* enqueues an email message to be sent */
1394{ 1394{
1395 xs *qmsg = _new_qmsg("email", msg, retries); 1395 xs *qmsg = _new_qmsg("email", msg, retries);
1396 char *ntid = xs_dict_get(qmsg, "ntid"); 1396 char *ntid = xs_dict_get(qmsg, "ntid");
1397 xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); 1397 xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid);
1398 1398
1399 qmsg = _enqueue_put(fn, qmsg); 1399 qmsg = _enqueue_put(fn, qmsg);
1400 1400
1401 snac_debug(snac, 1, xs_fmt("enqueue_email %d", retries)); 1401 srv_debug(1, xs_fmt("enqueue_email %d", retries));
1402} 1402}
1403 1403
1404 1404
diff --git a/httpd.c b/httpd.c
index c47b841..7932982 100644
--- a/httpd.c
+++ b/httpd.c
@@ -280,6 +280,9 @@ static void *queue_thread(void *arg)
280 } 280 }
281 } 281 }
282 282
283 /* global queue */
284 process_queue();
285
283 /* time to purge? */ 286 /* time to purge? */
284 if ((t = time(NULL)) > purge_time) { 287 if ((t = time(NULL)) > purge_time) {
285 pthread_t pth; 288 pthread_t pth;
diff --git a/snac.h b/snac.h
index 55ed49d..78007f7 100644
--- a/snac.h
+++ b/snac.h
@@ -126,10 +126,11 @@ d_char *history_list(snac *snac);
126void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries); 126void enqueue_input(snac *snac, xs_dict *msg, xs_dict *req, int retries);
127void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); 127void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries);
128void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries); 128void enqueue_output_by_actor(snac *snac, xs_dict *msg, xs_str *actor, int retries);
129void enqueue_email(snac *snac, xs_str *msg, int retries); 129void enqueue_email(xs_str *msg, int retries);
130void enqueue_message(snac *snac, char *msg); 130void enqueue_message(snac *snac, char *msg);
131 131
132xs_list *user_queue(snac *snac); 132xs_list *user_queue(snac *snac);
133xs_list *queue(void);
133xs_dict *dequeue(const char *fn); 134xs_dict *dequeue(const char *fn);
134 135
135void purge(snac *snac); 136void purge(snac *snac);
@@ -165,7 +166,11 @@ int send_to_inbox(snac *snac, char *inbox, char *msg, d_char **payload, int *p_s
165d_char *get_actor_inbox(snac *snac, char *actor); 166d_char *get_actor_inbox(snac *snac, char *actor);
166int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_size, int timeout); 167int send_to_actor(snac *snac, char *actor, char *msg, d_char **payload, int *p_size, int timeout);
167int is_msg_public(snac *snac, char *msg); 168int is_msg_public(snac *snac, char *msg);
169
168void process_user_queue(snac *snac); 170void process_user_queue(snac *snac);
171
172void process_queue(void);
173
169void post(snac *snac, char *msg); 174void post(snac *snac, char *msg);
170int activitypub_get_handler(d_char *req, char *q_path, 175int activitypub_get_handler(d_char *req, char *q_path,
171 char **body, int *b_size, char **ctype); 176 char **body, int *b_size, char **ctype);