diff options
| -rw-r--r-- | activitypub.c | 184 |
1 files changed, 96 insertions, 88 deletions
diff --git a/activitypub.c b/activitypub.c index 9bffb5b..ba6e45e 100644 --- a/activitypub.c +++ b/activitypub.c | |||
| @@ -845,7 +845,7 @@ void notify(snac *snac, char *type, char *utype, char *actor, char *msg) | |||
| 845 | 845 | ||
| 846 | /** queues **/ | 846 | /** queues **/ |
| 847 | 847 | ||
| 848 | int process_message(snac *snac, char *msg, char *req) | 848 | int process_input_message(snac *snac, char *msg, char *req) |
| 849 | /* processes an ActivityPub message from the input queue */ | 849 | /* processes an ActivityPub message from the input queue */ |
| 850 | { | 850 | { |
| 851 | /* actor and type exist, were checked previously */ | 851 | /* actor and type exist, were checked previously */ |
| @@ -1065,114 +1065,122 @@ int send_email(char *msg) | |||
| 1065 | } | 1065 | } |
| 1066 | 1066 | ||
| 1067 | 1067 | ||
| 1068 | void process_queue(snac *snac) | 1068 | void process_queue_item(snac *snac, xs_dict *q_item) |
| 1069 | /* processes the queue */ | 1069 | /* processes an item from the queue */ |
| 1070 | { | 1070 | { |
| 1071 | xs *list; | 1071 | char *type; |
| 1072 | char *p, *fn; | ||
| 1073 | int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); | 1072 | int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); |
| 1074 | 1073 | ||
| 1075 | list = queue(snac); | 1074 | if ((type = xs_dict_get(q_item, "type")) == NULL) |
| 1075 | type = "output"; | ||
| 1076 | 1076 | ||
| 1077 | p = list; | 1077 | if (strcmp(type, "message") == 0) { |
| 1078 | while (xs_list_iter(&p, &fn)) { | 1078 | xs_dict *msg = xs_dict_get(q_item, "message"); |
| 1079 | xs *q_item = dequeue(snac, fn); | 1079 | xs *inboxes = inbox_list(snac, msg); |
| 1080 | char *type; | 1080 | xs_list *p; |
| 1081 | xs_str *inbox; | ||
| 1081 | 1082 | ||
| 1082 | if (q_item == NULL) { | 1083 | p = inboxes; |
| 1083 | snac_log(snac, xs_fmt("process_queue q_item error")); | 1084 | while (xs_list_iter(&p, &inbox)) { |
| 1084 | continue; | 1085 | enqueue_output(snac, msg, inbox, 0); |
| 1085 | } | 1086 | } |
| 1087 | } | ||
| 1088 | else | ||
| 1089 | if (strcmp(type, "output") == 0) { | ||
| 1090 | int status; | ||
| 1091 | xs_str *inbox = xs_dict_get(q_item, "inbox"); | ||
| 1092 | xs_dict *msg = xs_dict_get(q_item, "message"); | ||
| 1093 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | ||
| 1094 | xs *payload = NULL; | ||
| 1095 | int p_size = 0; | ||
| 1096 | |||
| 1097 | if (xs_is_null(inbox) || xs_is_null(msg)) | ||
| 1098 | return; | ||
| 1086 | 1099 | ||
| 1087 | if ((type = xs_dict_get(q_item, "type")) == NULL) | 1100 | /* deliver */ |
| 1088 | type = "output"; | 1101 | status = send_to_inbox(snac, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8); |
| 1089 | 1102 | ||
| 1090 | if (strcmp(type, "message") == 0) { | 1103 | snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status)); |
| 1091 | char *msg = xs_dict_get(q_item, "message"); | ||
| 1092 | xs *inboxes = inbox_list(snac, msg); | ||
| 1093 | char *p, *v; | ||
| 1094 | 1104 | ||
| 1095 | p = inboxes; | 1105 | if (!valid_status(status)) { |
| 1096 | while (xs_list_iter(&p, &v)) { | 1106 | /* error sending; requeue? */ |
| 1097 | enqueue_output(snac, msg, v, 0); | 1107 | if (status == 404 || status == 410) |
| 1098 | } | 1108 | /* explicit error: discard */ |
| 1099 | } | 1109 | snac_log(snac, xs_fmt("process_queue error %s %d", inbox, status)); |
| 1100 | else | 1110 | else |
| 1101 | if (strcmp(type, "output") == 0) { | 1111 | if (retries > queue_retry_max) |
| 1102 | int status; | 1112 | snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status)); |
| 1103 | char *inbox = xs_dict_get(q_item, "inbox"); | 1113 | else { |
| 1104 | char *msg = xs_dict_get(q_item, "message"); | 1114 | /* requeue */ |
| 1105 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | 1115 | enqueue_output(snac, msg, inbox, retries + 1); |
| 1106 | xs *payload = NULL; | 1116 | snac_log(snac, xs_fmt("process_queue requeue %s #%d", inbox, retries + 1)); |
| 1107 | int p_size = 0; | ||
| 1108 | |||
| 1109 | if (xs_is_null(inbox) || xs_is_null(msg)) | ||
| 1110 | continue; | ||
| 1111 | |||
| 1112 | /* deliver */ | ||
| 1113 | status = send_to_inbox(snac, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8); | ||
| 1114 | |||
| 1115 | snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status)); | ||
| 1116 | |||
| 1117 | if (!valid_status(status)) { | ||
| 1118 | /* error sending; requeue? */ | ||
| 1119 | if (status == 404 || status == 410) | ||
| 1120 | /* explicit error: discard */ | ||
| 1121 | snac_log(snac, xs_fmt("process_queue error %s %d", inbox, status)); | ||
| 1122 | else | ||
| 1123 | if (retries > queue_retry_max) | ||
| 1124 | snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status)); | ||
| 1125 | else { | ||
| 1126 | /* requeue */ | ||
| 1127 | enqueue_output(snac, msg, inbox, retries + 1); | ||
| 1128 | snac_log(snac, xs_fmt("process_queue requeue %s #%d", inbox, retries + 1)); | ||
| 1129 | } | ||
| 1130 | } | 1117 | } |
| 1131 | } | 1118 | } |
| 1132 | else | 1119 | } |
| 1133 | if (strcmp(type, "input") == 0) { | 1120 | else |
| 1134 | /* process the message */ | 1121 | if (strcmp(type, "input") == 0) { |
| 1135 | char *msg = xs_dict_get(q_item, "message"); | 1122 | /* process the message */ |
| 1136 | char *req = xs_dict_get(q_item, "req"); | 1123 | xs_dict *msg = xs_dict_get(q_item, "message"); |
| 1137 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | 1124 | xs_dict *req = xs_dict_get(q_item, "req"); |
| 1138 | 1125 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | |
| 1139 | if (xs_is_null(msg)) | 1126 | |
| 1140 | continue; | 1127 | if (xs_is_null(msg)) |
| 1141 | 1128 | return; | |
| 1142 | if (!process_message(snac, msg, req)) { | 1129 | |
| 1143 | if (retries > queue_retry_max) | 1130 | if (!process_input_message(snac, msg, req)) { |
| 1144 | snac_log(snac, xs_fmt("process_queue input giving up")); | 1131 | if (retries > queue_retry_max) |
| 1145 | else { | 1132 | snac_log(snac, xs_fmt("process_queue input giving up")); |
| 1146 | /* reenqueue */ | 1133 | else { |
| 1147 | enqueue_input(snac, msg, req, retries + 1); | 1134 | /* reenqueue */ |
| 1148 | snac_log(snac, xs_fmt("process_queue input requeue #%d", retries + 1)); | 1135 | enqueue_input(snac, msg, req, retries + 1); |
| 1149 | } | 1136 | snac_log(snac, xs_fmt("process_queue input requeue #%d", retries + 1)); |
| 1150 | } | 1137 | } |
| 1151 | } | 1138 | } |
| 1152 | else | 1139 | } |
| 1153 | if (strcmp(type, "email") == 0) { | 1140 | else |
| 1154 | /* send this email */ | 1141 | if (strcmp(type, "email") == 0) { |
| 1155 | char *msg = xs_dict_get(q_item, "message"); | 1142 | /* send this email */ |
| 1156 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | 1143 | xs_str *msg = xs_dict_get(q_item, "message"); |
| 1144 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | ||
| 1157 | 1145 | ||
| 1158 | if (!send_email(msg)) | 1146 | if (!send_email(msg)) |
| 1159 | snac_debug(snac, 1, xs_fmt("email message sent")); | 1147 | snac_debug(snac, 1, xs_fmt("email message sent")); |
| 1148 | else { | ||
| 1149 | if (retries > queue_retry_max) | ||
| 1150 | snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno)); | ||
| 1160 | else { | 1151 | else { |
| 1161 | if (retries > queue_retry_max) | 1152 | /* requeue */ |
| 1162 | snac_log(snac, xs_fmt("process_queue email giving up (errno: %d)", errno)); | 1153 | snac_log(snac, xs_fmt( |
| 1163 | else { | 1154 | "process_queue email requeue #%d (errno: %d)", retries + 1, errno)); |
| 1164 | /* requeue */ | 1155 | |
| 1165 | snac_log(snac, xs_fmt( | 1156 | enqueue_email(snac, msg, retries + 1); |
| 1166 | "process_queue email requeue #%d (errno: %d)", retries + 1, errno)); | ||
| 1167 | |||
| 1168 | enqueue_email(snac, msg, retries + 1); | ||
| 1169 | } | ||
| 1170 | } | 1157 | } |
| 1171 | } | 1158 | } |
| 1172 | } | 1159 | } |
| 1173 | } | 1160 | } |
| 1174 | 1161 | ||
| 1175 | 1162 | ||
| 1163 | void process_queue(snac *snac) | ||
| 1164 | /* processes the queue */ | ||
| 1165 | { | ||
| 1166 | xs *list = queue(snac); | ||
| 1167 | |||
| 1168 | xs_list *p = list; | ||
| 1169 | xs_str *fn; | ||
| 1170 | |||
| 1171 | while (xs_list_iter(&p, &fn)) { | ||
| 1172 | xs *q_item = dequeue(snac, fn); | ||
| 1173 | |||
| 1174 | if (q_item == NULL) { | ||
| 1175 | snac_log(snac, xs_fmt("process_queue q_item error")); | ||
| 1176 | continue; | ||
| 1177 | } | ||
| 1178 | |||
| 1179 | process_queue_item(snac, q_item); | ||
| 1180 | } | ||
| 1181 | } | ||
| 1182 | |||
| 1183 | |||
| 1176 | /** HTTP handlers */ | 1184 | /** HTTP handlers */ |
| 1177 | 1185 | ||
| 1178 | int activitypub_get_handler(d_char *req, char *q_path, | 1186 | int activitypub_get_handler(d_char *req, char *q_path, |