summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--activitypub.c184
1 files changed, 96 insertions, 88 deletions
diff --git a/activitypub.c b/activitypub.c
index 9bffb5b..ba6e45e 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -845,7 +845,7 @@ void notify(snac *snac, char *type, char *utype, char *actor, char *msg)
845 845
846/** queues **/ 846/** queues **/
847 847
848int process_message(snac *snac, char *msg, char *req) 848int process_input_message(snac *snac, char *msg, char *req)
849/* processes an ActivityPub message from the input queue */ 849/* processes an ActivityPub message from the input queue */
850{ 850{
851 /* actor and type exist, were checked previously */ 851 /* actor and type exist, were checked previously */
@@ -1065,114 +1065,122 @@ int send_email(char *msg)
1065} 1065}
1066 1066
1067 1067
1068void process_queue(snac *snac) 1068void process_queue_item(snac *snac, xs_dict *q_item)
1069/* processes the queue */ 1069/* processes an item from the queue */
1070{ 1070{
1071 xs *list; 1071 char *type;
1072 char *p, *fn;
1073 int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); 1072 int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
1074 1073
1075 list = queue(snac); 1074 if ((type = xs_dict_get(q_item, "type")) == NULL)
1075 type = "output";
1076 1076
1077 p = list; 1077 if (strcmp(type, "message") == 0) {
1078 while (xs_list_iter(&p, &fn)) { 1078 xs_dict *msg = xs_dict_get(q_item, "message");
1079 xs *q_item = dequeue(snac, fn); 1079 xs *inboxes = inbox_list(snac, msg);
1080 char *type; 1080 xs_list *p;
1081 xs_str *inbox;
1081 1082
1082 if (q_item == NULL) { 1083 p = inboxes;
1083 snac_log(snac, xs_fmt("process_queue q_item error")); 1084 while (xs_list_iter(&p, &inbox)) {
1084 continue; 1085 enqueue_output(snac, msg, inbox, 0);
1085 } 1086 }
1087 }
1088 else
1089 if (strcmp(type, "output") == 0) {
1090 int status;
1091 xs_str *inbox = xs_dict_get(q_item, "inbox");
1092 xs_dict *msg = xs_dict_get(q_item, "message");
1093 int retries = xs_number_get(xs_dict_get(q_item, "retries"));
1094 xs *payload = NULL;
1095 int p_size = 0;
1096
1097 if (xs_is_null(inbox) || xs_is_null(msg))
1098 return;
1086 1099
1087 if ((type = xs_dict_get(q_item, "type")) == NULL) 1100 /* deliver */
1088 type = "output"; 1101 status = send_to_inbox(snac, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8);
1089 1102
1090 if (strcmp(type, "message") == 0) { 1103 snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status));
1091 char *msg = xs_dict_get(q_item, "message");
1092 xs *inboxes = inbox_list(snac, msg);
1093 char *p, *v;
1094 1104
1095 p = inboxes; 1105 if (!valid_status(status)) {
1096 while (xs_list_iter(&p, &v)) { 1106 /* error sending; requeue? */
1097 enqueue_output(snac, msg, v, 0); 1107 if (status == 404 || status == 410)
1098 } 1108 /* explicit error: discard */
1099 } 1109 snac_log(snac, xs_fmt("process_queue error %s %d", inbox, status));
1100 else 1110 else
1101 if (strcmp(type, "output") == 0) { 1111 if (retries > queue_retry_max)
1102 int status; 1112 snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status));
1103 char *inbox = xs_dict_get(q_item, "inbox"); 1113 else {
1104 char *msg = xs_dict_get(q_item, "message"); 1114 /* requeue */
1105 int retries = xs_number_get(xs_dict_get(q_item, "retries")); 1115 enqueue_output(snac, msg, inbox, retries + 1);
1106 xs *payload = NULL; 1116 snac_log(snac, xs_fmt("process_queue requeue %s #%d", inbox, retries + 1));
1107 int p_size = 0;
1108
1109 if (xs_is_null(inbox) || xs_is_null(msg))
1110 continue;
1111
1112 /* deliver */
1113 status = send_to_inbox(snac, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8);
1114
1115 snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status));
1116
1117 if (!valid_status(status)) {
1118 /* error sending; requeue? */
1119 if (status == 404 || status == 410)
1120 /* explicit error: discard */
1121 snac_log(snac, xs_fmt("process_queue error %s %d", inbox, status));
1122 else
1123 if (retries > queue_retry_max)
1124 snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status));
1125 else {
1126 /* requeue */
1127 enqueue_output(snac, msg, inbox, retries + 1);
1128 snac_log(snac, xs_fmt("process_queue requeue %s #%d", inbox, retries + 1));
1129 }
1130 } 1117 }
1131 } 1118 }
1132 else 1119 }
1133 if (strcmp(type, "input") == 0) { 1120 else
1134 /* process the message */ 1121 if (strcmp(type, "input") == 0) {
1135 char *msg = xs_dict_get(q_item, "message"); 1122 /* process the message */
1136 char *req = xs_dict_get(q_item, "req"); 1123 xs_dict *msg = xs_dict_get(q_item, "message");
1137 int retries = xs_number_get(xs_dict_get(q_item, "retries")); 1124 xs_dict *req = xs_dict_get(q_item, "req");
1138 1125 int retries = xs_number_get(xs_dict_get(q_item, "retries"));
1139 if (xs_is_null(msg)) 1126
1140 continue; 1127 if (xs_is_null(msg))
1141 1128 return;
1142 if (!process_message(snac, msg, req)) { 1129
1143 if (retries > queue_retry_max) 1130 if (!process_input_message(snac, msg, req)) {
1144 snac_log(snac, xs_fmt("process_queue input giving up")); 1131 if (retries > queue_retry_max)
1145 else { 1132 snac_log(snac, xs_fmt("process_queue input giving up"));
1146 /* reenqueue */ 1133 else {
1147 enqueue_input(snac, msg, req, retries + 1); 1134 /* reenqueue */
1148 snac_log(snac, xs_fmt("process_queue input requeue #%d", retries + 1)); 1135 enqueue_input(snac, msg, req, retries + 1);
1149 } 1136 snac_log(snac, xs_fmt("process_queue input requeue #%d", retries + 1));
1150 } 1137 }
1151 } 1138 }
1152 else 1139 }
1153 if (strcmp(type, "email") == 0) { 1140 else
1154 /* send this email */ 1141 if (strcmp(type, "email") == 0) {
1155 char *msg = xs_dict_get(q_item, "message"); 1142 /* send this email */
1156 int retries = xs_number_get(xs_dict_get(q_item, "retries")); 1143 xs_str *msg = xs_dict_get(q_item, "message");
1144 int retries = xs_number_get(xs_dict_get(q_item, "retries"));
1157 1145
1158 if (!send_email(msg)) 1146 if (!send_email(msg))
1159 snac_debug(snac, 1, xs_fmt("email message sent")); 1147 snac_debug(snac, 1, xs_fmt("email message sent"));
1148 else {
1149 if (retries > queue_retry_max)
1150 snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno));
1160 else { 1151 else {
1161 if (retries > queue_retry_max) 1152 /* requeue */
1162 snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno)); 1153 snac_log(snac, xs_fmt(
1163 else { 1154 "process_queue email requeue #%d (errno: %d)", retries + 1, errno));
1164 /* requeue */ 1155
1165 snac_log(snac, xs_fmt( 1156 enqueue_email(snac, msg, retries + 1);
1166 "process_queue email requeue #%d (errno: %d)", retries + 1, errno));
1167
1168 enqueue_email(snac, msg, retries + 1);
1169 }
1170 } 1157 }
1171 } 1158 }
1172 } 1159 }
1173} 1160}
1174 1161
1175 1162
1163void process_queue(snac *snac)
1164/* processes the queue */
1165{
1166 xs *list = queue(snac);
1167
1168 xs_list *p = list;
1169 xs_str *fn;
1170
1171 while (xs_list_iter(&p, &fn)) {
1172 xs *q_item = dequeue(snac, fn);
1173
1174 if (q_item == NULL) {
1175 snac_log(snac, xs_fmt("process_queue q_item error"));
1176 continue;
1177 }
1178
1179 process_queue_item(snac, q_item);
1180 }
1181}
1182
1183
1176/** HTTP handlers */ 1184/** HTTP handlers */
1177 1185
1178int activitypub_get_handler(d_char *req, char *q_path, 1186int activitypub_get_handler(d_char *req, char *q_path,