summaryrefslogtreecommitdiff
path: root/activitypub.c
diff options
context:
space:
mode:
authorGravatar default2022-11-17 18:33:54 +0100
committerGravatar default2022-11-17 18:33:54 +0100
commitf0e17d67537ee020ec391cd4459b77dfdad7ef24 (patch)
tree1c37b4c9415ae893eaa253a3015d03a1a8cdde0e /activitypub.c
parentMove send to actor logging to process_queue(). (diff)
downloadsnac2-f0e17d67537ee020ec391cd4459b77dfdad7ef24.tar.gz
snac2-f0e17d67537ee020ec391cd4459b77dfdad7ef24.tar.xz
snac2-f0e17d67537ee020ec391cd4459b77dfdad7ef24.zip
Queue messages to inboxes instead of actors.
Diffstat (limited to 'activitypub.c')
-rw-r--r--activitypub.c41
1 files changed, 33 insertions, 8 deletions
diff --git a/activitypub.c b/activitypub.c
index 0f801f7..7059471 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -212,6 +212,28 @@ d_char *recipient_list(snac *snac, char *msg, int expand_public)
212} 212}
213 213
214 214
215d_char *inbox_list(snac *snac, char *msg)
216/* returns the list of inboxes that are recipients of this message */
217{
218 d_char *list = xs_list_new();
219 xs *rcpts = recipient_list(snac, msg, 1);
220 char *p, *v;
221
222 p = rcpts;
223 while (xs_list_iter(&p, &v)) {
224 xs *inbox;
225
226 if ((inbox = get_actor_inbox(snac, v)) != NULL) {
227 /* add the inbox if it's not already there */
228 if (xs_list_in(list, inbox) == -1)
229 list = xs_list_append(list, inbox);
230 }
231 }
232
233 return list;
234}
235
236
215int is_msg_public(snac *snac, char *msg) 237int is_msg_public(snac *snac, char *msg)
216/* checks if a message is public */ 238/* checks if a message is public */
217{ 239{
@@ -930,27 +952,30 @@ void process_queue(snac *snac)
930 952
931 if (strcmp(type, "output") == 0) { 953 if (strcmp(type, "output") == 0) {
932 int status; 954 int status;
933 char *actor = xs_dict_get(q_item, "actor"); 955 char *inbox = xs_dict_get(q_item, "inbox");
934 char *msg = xs_dict_get(q_item, "object"); 956 char *msg = xs_dict_get(q_item, "object");
935 int retries = xs_number_get(xs_dict_get(q_item, "retries")); 957 int retries = xs_number_get(xs_dict_get(q_item, "retries"));
936 xs *payload = NULL; 958 xs *payload = NULL;
937 int p_size = 0; 959 int p_size = 0;
938 960
961 if (xs_is_null(inbox) || xs_is_null(msg))
962 continue;
963
939 /* deliver */ 964 /* deliver */
940 status = send_to_actor(snac, actor, msg, &payload, &p_size); 965 status = send_to_inbox(snac, inbox, msg, &payload, &p_size);
941 966
942 if (!valid_status(status)) { 967 if (!valid_status(status)) {
943 /* error sending; requeue? */ 968 /* error sending; requeue? */
944 if (retries > queue_retry_max) 969 if (retries > queue_retry_max)
945 snac_log(snac, xs_fmt("process_queue giving up %s %d", actor, status)); 970 snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status));
946 else { 971 else {
947 /* requeue */ 972 /* requeue */
948 enqueue_output(snac, msg, actor, retries + 1); 973 enqueue_output(snac, msg, inbox, retries + 1);
949 snac_log(snac, xs_fmt("process_queue requeue %s %d", actor, retries + 1)); 974 snac_log(snac, xs_fmt("process_queue requeue %s %d", inbox, retries + 1));
950 } 975 }
951 } 976 }
952 else 977 else
953 snac_log(snac, xs_fmt("process_queue sent to actor %s %d", actor, status)); 978 snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status));
954 } 979 }
955 else 980 else
956 if (strcmp(type, "input") == 0) { 981 if (strcmp(type, "input") == 0) {
@@ -1005,10 +1030,10 @@ void process_queue(snac *snac)
1005void post(snac *snac, char *msg) 1030void post(snac *snac, char *msg)
1006/* enqueues a message to all its recipients */ 1031/* enqueues a message to all its recipients */
1007{ 1032{
1008 xs *rcpts = recipient_list(snac, msg, 1); 1033 xs *inboxes = inbox_list(snac, msg);
1009 char *p, *v; 1034 char *p, *v;
1010 1035
1011 p = rcpts; 1036 p = inboxes;
1012 while (xs_list_iter(&p, &v)) { 1037 while (xs_list_iter(&p, &v)) {
1013 enqueue_output(snac, msg, v, 0); 1038 enqueue_output(snac, msg, v, 0);
1014 } 1039 }