summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--activitypub.c41
-rw-r--r--data.c10
-rw-r--r--snac.h2
3 files changed, 39 insertions, 14 deletions
diff --git a/activitypub.c b/activitypub.c
index 0f801f7..7059471 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -212,6 +212,28 @@ d_char *recipient_list(snac *snac, char *msg, int expand_public)
212} 212}
213 213
214 214
215d_char *inbox_list(snac *snac, char *msg)
216/* returns the list of inboxes that are recipients of this message */
217{
218 d_char *list = xs_list_new();
219 xs *rcpts = recipient_list(snac, msg, 1);
220 char *p, *v;
221
222 p = rcpts;
223 while (xs_list_iter(&p, &v)) {
224 xs *inbox;
225
226 if ((inbox = get_actor_inbox(snac, v)) != NULL) {
227 /* add the inbox if it's not already there */
228 if (xs_list_in(list, inbox) == -1)
229 list = xs_list_append(list, inbox);
230 }
231 }
232
233 return list;
234}
235
236
215int is_msg_public(snac *snac, char *msg) 237int is_msg_public(snac *snac, char *msg)
216/* checks if a message is public */ 238/* checks if a message is public */
217{ 239{
@@ -930,27 +952,30 @@ void process_queue(snac *snac)
930 952
931 if (strcmp(type, "output") == 0) { 953 if (strcmp(type, "output") == 0) {
932 int status; 954 int status;
933 char *actor = xs_dict_get(q_item, "actor"); 955 char *inbox = xs_dict_get(q_item, "inbox");
934 char *msg = xs_dict_get(q_item, "object"); 956 char *msg = xs_dict_get(q_item, "object");
935 int retries = xs_number_get(xs_dict_get(q_item, "retries")); 957 int retries = xs_number_get(xs_dict_get(q_item, "retries"));
936 xs *payload = NULL; 958 xs *payload = NULL;
937 int p_size = 0; 959 int p_size = 0;
938 960
961 if (xs_is_null(inbox) || xs_is_null(msg))
962 continue;
963
939 /* deliver */ 964 /* deliver */
940 status = send_to_actor(snac, actor, msg, &payload, &p_size); 965 status = send_to_inbox(snac, inbox, msg, &payload, &p_size);
941 966
942 if (!valid_status(status)) { 967 if (!valid_status(status)) {
943 /* error sending; requeue? */ 968 /* error sending; requeue? */
944 if (retries > queue_retry_max) 969 if (retries > queue_retry_max)
945 snac_log(snac, xs_fmt("process_queue giving up %s %d", actor, status)); 970 snac_log(snac, xs_fmt("process_queue giving up %s %d", inbox, status));
946 else { 971 else {
947 /* requeue */ 972 /* requeue */
948 enqueue_output(snac, msg, actor, retries + 1); 973 enqueue_output(snac, msg, inbox, retries + 1);
949 snac_log(snac, xs_fmt("process_queue requeue %s %d", actor, retries + 1)); 974 snac_log(snac, xs_fmt("process_queue requeue %s %d", inbox, retries + 1));
950 } 975 }
951 } 976 }
952 else 977 else
953 snac_log(snac, xs_fmt("process_queue sent to actor %s %d", actor, status)); 978 snac_log(snac, xs_fmt("process_queue sent to inbox %s %d", inbox, status));
954 } 979 }
955 else 980 else
956 if (strcmp(type, "input") == 0) { 981 if (strcmp(type, "input") == 0) {
@@ -1005,10 +1030,10 @@ void process_queue(snac *snac)
1005void post(snac *snac, char *msg) 1030void post(snac *snac, char *msg)
1006/* enqueues a message to all its recipients */ 1031/* enqueues a message to all its recipients */
1007{ 1032{
1008 xs *rcpts = recipient_list(snac, msg, 1); 1033 xs *inboxes = inbox_list(snac, msg);
1009 char *p, *v; 1034 char *p, *v;
1010 1035
1011 p = rcpts; 1036 p = inboxes;
1012 while (xs_list_iter(&p, &v)) { 1037 while (xs_list_iter(&p, &v)) {
1013 enqueue_output(snac, msg, v, 0); 1038 enqueue_output(snac, msg, v, 0);
1014 } 1039 }
diff --git a/data.c b/data.c
index 3c25123..2fbb49e 100644
--- a/data.c
+++ b/data.c
@@ -1063,11 +1063,11 @@ void enqueue_input(snac *snac, char *msg, char *req, int retries)
1063} 1063}
1064 1064
1065 1065
1066void enqueue_output(snac *snac, char *msg, char *actor, int retries) 1066void enqueue_output(snac *snac, char *msg, char *inbox, int retries)
1067/* enqueues an output message for an actor */ 1067/* enqueues an output message for an actor */
1068{ 1068{
1069 if (strcmp(actor, snac->actor) == 0) { 1069 if (xs_startswith(inbox, snac->actor)) {
1070 snac_debug(snac, 1, xs_str_new("enqueue refused to myself")); 1070 snac_debug(snac, 1, xs_str_new("refusing enqueue to myself"));
1071 return; 1071 return;
1072 } 1072 }
1073 1073
@@ -1078,13 +1078,13 @@ void enqueue_output(snac *snac, char *msg, char *actor, int retries)
1078 xs *rn = xs_number_new(retries); 1078 xs *rn = xs_number_new(retries);
1079 1079
1080 qmsg = xs_dict_append(qmsg, "type", "output"); 1080 qmsg = xs_dict_append(qmsg, "type", "output");
1081 qmsg = xs_dict_append(qmsg, "actor", actor); 1081 qmsg = xs_dict_append(qmsg, "inbox", inbox);
1082 qmsg = xs_dict_append(qmsg, "object", msg); 1082 qmsg = xs_dict_append(qmsg, "object", msg);
1083 qmsg = xs_dict_append(qmsg, "retries", rn); 1083 qmsg = xs_dict_append(qmsg, "retries", rn);
1084 1084
1085 _enqueue_put(fn, qmsg); 1085 _enqueue_put(fn, qmsg);
1086 1086
1087 snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", actor, fn, retries)); 1087 snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", inbox, fn, retries));
1088} 1088}
1089 1089
1090 1090
diff --git a/snac.h b/snac.h
index 4e4a981..45c099e 100644
--- a/snac.h
+++ b/snac.h
@@ -92,7 +92,7 @@ int history_del(snac *snac, char *id);
92d_char *history_list(snac *snac); 92d_char *history_list(snac *snac);
93 93
94void enqueue_input(snac *snac, char *msg, char *req, int retries); 94void enqueue_input(snac *snac, char *msg, char *req, int retries);
95void enqueue_output(snac *snac, char *msg, char *actor, int retries); 95void enqueue_output(snac *snac, char *msg, char *inbox, int retries);
96void enqueue_email(snac *snac, char *msg, int retries); 96void enqueue_email(snac *snac, char *msg, int retries);
97 97
98d_char *queue(snac *snac); 98d_char *queue(snac *snac);