summaryrefslogtreecommitdiff
path: root/activitypub.c
diff options
context:
space:
mode:
Diffstat (limited to 'activitypub.c')
-rw-r--r--activitypub.c283
1 files changed, 221 insertions, 62 deletions
diff --git a/activitypub.c b/activitypub.c
index 1ae5ad9..cade0d9 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -1,5 +1,5 @@
1/* snac - A simple, minimalistic ActivityPub instance */ 1/* snac - A simple, minimalistic ActivityPub instance */
2/* copyright (c) 2022 - 2024 grunfink et al. / MIT license */ 2/* copyright (c) 2022 - 2025 grunfink et al. / MIT license */
3 3
4#include "xs.h" 4#include "xs.h"
5#include "xs_json.h" 5#include "xs_json.h"
@@ -587,6 +587,70 @@ int is_msg_from_private_user(const xs_dict *msg)
587} 587}
588 588
589 589
590int followed_hashtag_check(snac *user, const xs_dict *msg)
591/* returns true if this message contains a hashtag followed by me */
592{
593 const xs_list *fw_tags = xs_dict_get(user->config, "followed_hashtags");
594
595 if (xs_is_list(fw_tags)) {
596 const xs_list *tags_in_msg = xs_dict_get(msg, "tag");
597
598 if (xs_is_list(tags_in_msg)) {
599 const xs_dict *te;
600
601 /* iterate the tags in the message */
602 xs_list_foreach(tags_in_msg, te) {
603 if (xs_is_dict(te)) {
604 const char *type = xs_dict_get(te, "type");
605 const char *name = xs_dict_get(te, "name");
606
607 if (xs_is_string(type) && xs_is_string(name)) {
608 if (strcmp(type, "Hashtag") == 0) {
609 xs *lc_name = xs_utf8_to_lower(name);
610
611 if (xs_list_in(fw_tags, lc_name) != -1)
612 return 1;
613 }
614 }
615 }
616 }
617 }
618 }
619
620 return 0;
621}
622
623
624void followed_hashtag_distribute(const xs_dict *msg)
625/* distribute this post to all users following the included hashtags */
626{
627 const char *id = xs_dict_get(msg, "id");
628 const xs_list *tags_in_msg = xs_dict_get(msg, "tag");
629
630 if (!xs_is_string(id) || !xs_is_list(tags_in_msg) || xs_list_len(tags_in_msg) == 0)
631 return;
632
633 srv_debug(1, xs_fmt("followed_hashtag_distribute check for %s", id));
634
635 xs *users = user_list();
636 const char *uid;
637
638 xs_list_foreach(users, uid) {
639 snac user;
640
641 if (user_open(&user, uid)) {
642 if (followed_hashtag_check(&user, msg)) {
643 timeline_add(&user, id, msg);
644
645 snac_log(&user, xs_fmt("followed hashtag in %s", id));
646 }
647
648 user_free(&user);
649 }
650 }
651}
652
653
590int is_msg_for_me(snac *snac, const xs_dict *c_msg) 654int is_msg_for_me(snac *snac, const xs_dict *c_msg)
591/* checks if this message is for me */ 655/* checks if this message is for me */
592{ 656{
@@ -602,19 +666,32 @@ int is_msg_for_me(snac *snac, const xs_dict *c_msg)
602 if (xs_match(type, "Like|Announce|EmojiReact")) { 666 if (xs_match(type, "Like|Announce|EmojiReact")) {
603 const char *object = xs_dict_get(c_msg, "object"); 667 const char *object = xs_dict_get(c_msg, "object");
604 668
605 if (xs_type(object) == XSTYPE_DICT) 669 if (xs_is_dict(object))
606 object = xs_dict_get(object, "id"); 670 object = xs_dict_get(object, "id");
607 671
608 /* bad object id? reject */ 672 /* bad object id? reject */
609 if (xs_type(object) != XSTYPE_STRING) 673 if (!xs_is_string(object))
610 return 0; 674 return 0;
611 675
612 /* if it's about one of our posts, accept it */ 676 /* if it's about one of our posts, accept it */
613 if (xs_startswith(object, snac->actor)) 677 if (xs_startswith(object, snac->actor))
614 return 2; 678 return 2;
615 679
616 /* if it's by someone we don't follow, reject */ 680 /* if it's by someone we follow, accept it */
617 return following_check(snac, actor); 681 if (following_check(snac, actor))
682 return 1;
683
684 /* do we follow any hashtag? */
685 if (xs_is_list(xs_dict_get(snac->config, "followed_hashtags"))) {
686 xs *obj = NULL;
687
688 /* if the admired object contains any followed hashtag, accept it */
689 if (valid_status(object_get(object, &obj)) &&
690 followed_hashtag_check(snac, obj))
691 return 7;
692 }
693
694 return 0;
618 } 695 }
619 696
620 /* if it's an Undo, it must be from someone related to us */ 697 /* if it's an Undo, it must be from someone related to us */
@@ -675,7 +752,7 @@ int is_msg_for_me(snac *snac, const xs_dict *c_msg)
675 752
676 if (pub_msg) { 753 if (pub_msg) {
677 /* a public message for someone we follow? (probably cc'ed) accept */ 754 /* a public message for someone we follow? (probably cc'ed) accept */
678 if (following_check(snac, v)) 755 if (strcmp(v, public_address) != 0 && following_check(snac, v))
679 return 5; 756 return 5;
680 } 757 }
681 else 758 else
@@ -708,30 +785,8 @@ int is_msg_for_me(snac *snac, const xs_dict *c_msg)
708 } 785 }
709 786
710 /* does this message contain a tag we are following? */ 787 /* does this message contain a tag we are following? */
711 const xs_list *fw_tags = xs_dict_get(snac->config, "followed_hashtags"); 788 if (pub_msg && followed_hashtag_check(snac, msg))
712 if (pub_msg && xs_type(fw_tags) == XSTYPE_LIST) { 789 return 7;
713 const xs_list *tags_in_msg = xs_dict_get(msg, "tag");
714 if (xs_type(tags_in_msg) == XSTYPE_LIST) {
715 const xs_dict *te;
716
717 /* iterate the tags in the message */
718 xs_list_foreach(tags_in_msg, te) {
719 if (xs_type(te) == XSTYPE_DICT) {
720 const char *type = xs_dict_get(te, "type");
721 const char *name = xs_dict_get(te, "name");
722
723 if (xs_type(type) == XSTYPE_STRING && xs_type(name) == XSTYPE_STRING) {
724 if (strcmp(type, "Hashtag") == 0) {
725 xs *lc_name = xs_utf8_to_lower(name);
726
727 if (xs_list_in(fw_tags, lc_name) != -1)
728 return 7;
729 }
730 }
731 }
732 }
733 }
734 }
735 790
736 return 0; 791 return 0;
737} 792}
@@ -889,6 +944,11 @@ void notify(snac *snac, const char *type, const char *utype, const char *actor,
889 /* if it's not an admiration about something by us, done */ 944 /* if it's not an admiration about something by us, done */
890 if (xs_is_null(objid) || !xs_startswith(objid, snac->actor)) 945 if (xs_is_null(objid) || !xs_startswith(objid, snac->actor))
891 return; 946 return;
947
948 /* if it's an announce by our own relay, done */
949 xs *relay_id = xs_fmt("%s/relay", srv_baseurl);
950 if (xs_startswith(id, relay_id))
951 return;
892 } 952 }
893 953
894 /* updated poll? */ 954 /* updated poll? */
@@ -1184,6 +1244,28 @@ xs_dict *msg_repulsion(snac *user, const char *id, const char *type)
1184} 1244}
1185 1245
1186 1246
1247xs_dict *msg_actor_place(snac *user, const char *label)
1248/* creates a Place object, if the user has a location defined */
1249{
1250 xs_dict *place = NULL;
1251 const char *latitude = xs_dict_get_def(user->config, "latitude", "");
1252 const char *longitude = xs_dict_get_def(user->config, "longitude", "");
1253
1254 if (*latitude && *longitude) {
1255 xs *d_la = xs_number_new(atof(latitude));
1256 xs *d_lo = xs_number_new(atof(longitude));
1257
1258 place = msg_base(user, "Place", NULL, user->actor, NULL, NULL);
1259
1260 place = xs_dict_set(place, "name", label);
1261 place = xs_dict_set(place, "latitude", d_la);
1262 place = xs_dict_set(place, "longitude", d_lo);
1263 }
1264
1265 return place;
1266}
1267
1268
1187xs_dict *msg_actor(snac *snac) 1269xs_dict *msg_actor(snac *snac)
1188/* create a Person message for this actor */ 1270/* create a Person message for this actor */
1189{ 1271{
@@ -1194,10 +1276,20 @@ xs_dict *msg_actor(snac *snac)
1194 xs *avtr = NULL; 1276 xs *avtr = NULL;
1195 xs *kid = NULL; 1277 xs *kid = NULL;
1196 xs *f_bio = NULL; 1278 xs *f_bio = NULL;
1197 xs_dict *msg = msg_base(snac, "Person", snac->actor, NULL, NULL, NULL); 1279 xs_dict *msg = NULL;
1198 const char *p; 1280 const char *p;
1199 int n; 1281 int n;
1200 1282
1283 /* everybody loves some caching */
1284 if (time(NULL) - object_mtime(snac->actor) < 3 * 3600 &&
1285 valid_status(object_get(snac->actor, &msg))) {
1286 snac_debug(snac, 2, xs_fmt("Returning cached actor %s", snac->actor));
1287
1288 return msg;
1289 }
1290
1291 msg = msg_base(snac, "Person", snac->actor, NULL, NULL, NULL);
1292
1201 /* change the @context (is this really necessary?) */ 1293 /* change the @context (is this really necessary?) */
1202 ctxt = xs_list_append(ctxt, "https:/" "/www.w3.org/ns/activitystreams"); 1294 ctxt = xs_list_append(ctxt, "https:/" "/www.w3.org/ns/activitystreams");
1203 ctxt = xs_list_append(ctxt, "https:/" "/w3id.org/security/v1"); 1295 ctxt = xs_list_append(ctxt, "https:/" "/w3id.org/security/v1");
@@ -1242,6 +1334,10 @@ xs_dict *msg_actor(snac *snac)
1242 if (xs_type(xs_dict_get(snac->config, "bot")) == XSTYPE_TRUE) 1334 if (xs_type(xs_dict_get(snac->config, "bot")) == XSTYPE_TRUE)
1243 msg = xs_dict_set(msg, "type", "Service"); 1335 msg = xs_dict_set(msg, "type", "Service");
1244 1336
1337 /* if it's named "relay", then identify as an "Application" */
1338 if (strcmp(snac->uid, "relay") == 0)
1339 msg = xs_dict_set(msg, "type", "Application");
1340
1245 /* add the header image, if there is one defined */ 1341 /* add the header image, if there is one defined */
1246 const char *header = xs_dict_get(snac->config, "header"); 1342 const char *header = xs_dict_get(snac->config, "header");
1247 if (!xs_is_null(header)) { 1343 if (!xs_is_null(header)) {
@@ -1307,7 +1403,7 @@ xs_dict *msg_actor(snac *snac)
1307 } 1403 }
1308 1404
1309 /* use shared inboxes? */ 1405 /* use shared inboxes? */
1310 if (xs_type(xs_dict_get(srv_config, "shared_inboxes")) == XSTYPE_TRUE) { 1406 if (xs_is_true(xs_dict_get(srv_config, "shared_inboxes")) || strcmp(snac->uid, "relay") == 0) {
1311 xs *d = xs_dict_new(); 1407 xs *d = xs_dict_new();
1312 xs *si = xs_fmt("%s/shared-inbox", srv_baseurl); 1408 xs *si = xs_fmt("%s/shared-inbox", srv_baseurl);
1313 d = xs_dict_append(d, "sharedInbox", si); 1409 d = xs_dict_append(d, "sharedInbox", si);
@@ -1326,6 +1422,15 @@ xs_dict *msg_actor(snac *snac)
1326 msg = xs_dict_set(msg, "manuallyApprovesFollowers", 1422 msg = xs_dict_set(msg, "manuallyApprovesFollowers",
1327 xs_stock(xs_is_true(manually) ? XSTYPE_TRUE : XSTYPE_FALSE)); 1423 xs_stock(xs_is_true(manually) ? XSTYPE_TRUE : XSTYPE_FALSE));
1328 1424
1425 /* if there are location coords, create a Place object */
1426 xs *location = msg_actor_place(snac, "Home");
1427 if (xs_type(location) == XSTYPE_DICT)
1428 msg = xs_dict_set(msg, "location", location);
1429
1430 /* cache it */
1431 snac_debug(snac, 1, xs_fmt("Caching actor %s", snac->actor));
1432 object_add_ow(snac->actor, msg);
1433
1329 return msg; 1434 return msg;
1330} 1435}
1331 1436
@@ -1423,8 +1528,9 @@ xs_dict *msg_follow(snac *snac, const char *q)
1423 1528
1424xs_dict *msg_note(snac *snac, const xs_str *content, const xs_val *rcpts, 1529xs_dict *msg_note(snac *snac, const xs_str *content, const xs_val *rcpts,
1425 const xs_str *in_reply_to, const xs_list *attach, 1530 const xs_str *in_reply_to, const xs_list *attach,
1426 int priv, const char *lang_str) 1531 int scope, const char *lang_str)
1427/* creates a 'Note' message */ 1532/* creates a 'Note' message */
1533/* scope: 0, public; 1, private (mentioned only); 2, "quiet public"; 3, followers only */
1428{ 1534{
1429 xs *ntid = tid(0); 1535 xs *ntid = tid(0);
1430 xs *id = xs_fmt("%s/p/%s", snac->actor, ntid); 1536 xs *id = xs_fmt("%s/p/%s", snac->actor, ntid);
@@ -1440,6 +1546,9 @@ xs_dict *msg_note(snac *snac, const xs_str *content, const xs_val *rcpts,
1440 xs_list *p; 1546 xs_list *p;
1441 const xs_val *v; 1547 const xs_val *v;
1442 1548
1549 /* FIXME: implement scope 3 */
1550 int priv = scope == 1;
1551
1443 if (rcpts == NULL) 1552 if (rcpts == NULL)
1444 to = xs_list_new(); 1553 to = xs_list_new();
1445 else { 1554 else {
@@ -1557,6 +1666,12 @@ xs_dict *msg_note(snac *snac, const xs_str *content, const xs_val *rcpts,
1557 } 1666 }
1558 } 1667 }
1559 1668
1669 if (scope == 2) {
1670 /* Mastodon's "quiet public": add public address to cc */
1671 if (xs_list_in(cc, public_address) == -1)
1672 cc = xs_list_append(cc, public_address);
1673 }
1674 else
1560 /* no recipients? must be for everybody */ 1675 /* no recipients? must be for everybody */
1561 if (!priv && xs_list_len(to) == 0) 1676 if (!priv && xs_list_len(to) == 0)
1562 to = xs_list_append(to, public_address); 1677 to = xs_list_append(to, public_address);
@@ -1845,6 +1960,17 @@ int process_input_message(snac *snac, const xs_dict *msg, const xs_dict *req)
1845 /* reject uninteresting messages right now */ 1960 /* reject uninteresting messages right now */
1846 if (xs_match(type, "Add|View|Reject|Read|Remove")) { 1961 if (xs_match(type, "Add|View|Reject|Read|Remove")) {
1847 srv_debug(0, xs_fmt("Ignored message of type '%s'", type)); 1962 srv_debug(0, xs_fmt("Ignored message of type '%s'", type));
1963
1964 /* archive the ignored activity */
1965 xs *ntid = tid(0);
1966 xs *fn = xs_fmt("%s/ignored/%s.json", srv_basedir, ntid);
1967 FILE *f;
1968
1969 if ((f = fopen(fn, "w")) != NULL) {
1970 xs_json_dump(msg, 4, f);
1971 fclose(f);
1972 }
1973
1848 return -1; 1974 return -1;
1849 } 1975 }
1850 1976
@@ -2118,14 +2244,14 @@ int process_input_message(snac *snac, const xs_dict *msg, const xs_dict *req)
2118 snac_log(snac, xs_fmt("new 'Question' %s %s", actor, id)); 2244 snac_log(snac, xs_fmt("new 'Question' %s %s", actor, id));
2119 } 2245 }
2120 else 2246 else
2121 if (strcmp(utype, "Video") == 0) { /** **/ 2247 if (xs_match(utype, "Audio|Video|Event")) { /** **/
2122 const char *id = xs_dict_get(object, "id"); 2248 const char *id = xs_dict_get(object, "id");
2123 2249
2124 if (xs_is_null(id)) 2250 if (xs_is_null(id))
2125 snac_log(snac, xs_fmt("malformed message: no 'id' field")); 2251 snac_log(snac, xs_fmt("malformed message: no 'id' field"));
2126 else 2252 else
2127 if (timeline_add(snac, id, object)) 2253 if (timeline_add(snac, id, object))
2128 snac_log(snac, xs_fmt("new 'Video' %s %s", actor, id)); 2254 snac_log(snac, xs_fmt("new '%s' %s %s", utype, actor, id));
2129 } 2255 }
2130 else 2256 else
2131 snac_debug(snac, 1, xs_fmt("ignored 'Create' for object type '%s'", utype)); 2257 snac_debug(snac, 1, xs_fmt("ignored 'Create' for object type '%s'", utype));
@@ -2203,15 +2329,23 @@ int process_input_message(snac *snac, const xs_dict *msg, const xs_dict *req)
2203 xs *who_o = NULL; 2329 xs *who_o = NULL;
2204 2330
2205 if (valid_status(actor_request(snac, who, &who_o))) { 2331 if (valid_status(actor_request(snac, who, &who_o))) {
2206 if (timeline_admire(snac, object, actor, 0) == HTTP_STATUS_CREATED) 2332 /* don't account as such announces by our own relay */
2207 snac_log(snac, xs_fmt("new 'Announce' %s %s", actor, object)); 2333 xs *this_relay = xs_fmt("%s/relay", srv_baseurl);
2208 else 2334
2209 snac_log(snac, xs_fmt("repeated 'Announce' from %s to %s", 2335 if (strcmp(actor, this_relay) != 0) {
2210 actor, object)); 2336 if (timeline_admire(snac, object, actor, 0) == HTTP_STATUS_CREATED)
2337 snac_log(snac, xs_fmt("new 'Announce' %s %s", actor, object));
2338 else
2339 snac_log(snac, xs_fmt("repeated 'Announce' from %s to %s",
2340 actor, object));
2341 }
2211 2342
2212 /* distribute the post with the actor as 'proxy' */ 2343 /* distribute the post with the actor as 'proxy' */
2213 list_distribute(snac, actor, a_msg); 2344 list_distribute(snac, actor, a_msg);
2214 2345
2346 /* distribute the post to users following these hashtags */
2347 followed_hashtag_distribute(a_msg);
2348
2215 do_notify = 1; 2349 do_notify = 1;
2216 } 2350 }
2217 else 2351 else
@@ -2226,14 +2360,14 @@ int process_input_message(snac *snac, const xs_dict *msg, const xs_dict *req)
2226 } 2360 }
2227 else 2361 else
2228 if (strcmp(type, "Update") == 0) { /** **/ 2362 if (strcmp(type, "Update") == 0) { /** **/
2229 if (xs_match(utype, "Person|Service")) { /** **/ 2363 if (xs_match(utype, "Person|Service|Application")) { /** **/
2230 actor_add(actor, xs_dict_get(msg, "object")); 2364 actor_add(actor, xs_dict_get(msg, "object"));
2231 timeline_touch(snac); 2365 timeline_touch(snac);
2232 2366
2233 snac_log(snac, xs_fmt("updated actor %s", actor)); 2367 snac_log(snac, xs_fmt("updated actor %s", actor));
2234 } 2368 }
2235 else 2369 else
2236 if (xs_match(utype, "Note|Page|Article|Video")) { /** **/ 2370 if (xs_match(utype, "Note|Page|Article|Video|Audio|Event")) { /** **/
2237 const char *id = xs_dict_get(object, "id"); 2371 const char *id = xs_dict_get(object, "id");
2238 2372
2239 if (xs_is_null(id)) 2373 if (xs_is_null(id))
@@ -2419,7 +2553,7 @@ int send_email(const char *msg)
2419} 2553}
2420 2554
2421 2555
2422void process_user_queue_item(snac *snac, xs_dict *q_item) 2556void process_user_queue_item(snac *user, xs_dict *q_item)
2423/* processes an item from the user queue */ 2557/* processes an item from the user queue */
2424{ 2558{
2425 const char *type; 2559 const char *type;
@@ -2430,7 +2564,7 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
2430 2564
2431 if (strcmp(type, "message") == 0) { 2565 if (strcmp(type, "message") == 0) {
2432 const xs_dict *msg = xs_dict_get(q_item, "message"); 2566 const xs_dict *msg = xs_dict_get(q_item, "message");
2433 xs *rcpts = recipient_list(snac, msg, 1); 2567 xs *rcpts = recipient_list(user, msg, 1);
2434 xs_set inboxes; 2568 xs_set inboxes;
2435 const xs_str *actor; 2569 const xs_str *actor;
2436 2570
@@ -2439,7 +2573,7 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
2439 /* add this shared inbox first */ 2573 /* add this shared inbox first */
2440 xs *this_shared_inbox = xs_fmt("%s/shared-inbox", srv_baseurl); 2574 xs *this_shared_inbox = xs_fmt("%s/shared-inbox", srv_baseurl);
2441 xs_set_add(&inboxes, this_shared_inbox); 2575 xs_set_add(&inboxes, this_shared_inbox);
2442 enqueue_output(snac, msg, this_shared_inbox, 0, 0); 2576 enqueue_output(user, msg, this_shared_inbox, 0, 0);
2443 2577
2444 /* iterate the recipients */ 2578 /* iterate the recipients */
2445 xs_list_foreach(rcpts, actor) { 2579 xs_list_foreach(rcpts, actor) {
@@ -2450,10 +2584,10 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
2450 if (inbox != NULL) { 2584 if (inbox != NULL) {
2451 /* add to the set and, if it's not there, send message */ 2585 /* add to the set and, if it's not there, send message */
2452 if (xs_set_add(&inboxes, inbox) == 1) 2586 if (xs_set_add(&inboxes, inbox) == 1)
2453 enqueue_output(snac, msg, inbox, 0, 0); 2587 enqueue_output(user, msg, inbox, 0, 0);
2454 } 2588 }
2455 else 2589 else
2456 snac_log(snac, xs_fmt("cannot find inbox for %s", actor)); 2590 snac_log(user, xs_fmt("cannot find inbox for %s", actor));
2457 } 2591 }
2458 } 2592 }
2459 2593
@@ -2465,12 +2599,36 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
2465 2599
2466 xs_list_foreach(shibx, inbox) { 2600 xs_list_foreach(shibx, inbox) {
2467 if (xs_set_add(&inboxes, inbox) == 1) 2601 if (xs_set_add(&inboxes, inbox) == 1)
2468 enqueue_output(snac, msg, inbox, 0, 0); 2602 enqueue_output(user, msg, inbox, 0, 0);
2469 } 2603 }
2470 } 2604 }
2471 } 2605 }
2472 2606
2473 xs_set_free(&inboxes); 2607 xs_set_free(&inboxes);
2608
2609 /* relay this note */
2610 if (is_msg_public(msg) && strcmp(user->uid, "relay") != 0) { /* avoid loops */
2611 snac relay;
2612 if (user_open(&relay, "relay")) {
2613 /* a 'relay' user exists */
2614 const char *type = xs_dict_get(msg, "type");
2615
2616 if (xs_is_string(type) && strcmp(type, "Create") == 0) {
2617 const xs_val *object = xs_dict_get(msg, "object");
2618
2619 if (xs_is_dict(object)) {
2620 object = xs_dict_get(object, "id");
2621
2622 snac_debug(&relay, 1, xs_fmt("relaying message %s", object));
2623
2624 xs *boost = msg_admiration(&relay, object, "Announce");
2625 enqueue_message(&relay, boost);
2626 }
2627 }
2628
2629 user_free(&relay);
2630 }
2631 }
2474 } 2632 }
2475 else 2633 else
2476 if (strcmp(type, "input") == 0) { 2634 if (strcmp(type, "input") == 0) {
@@ -2482,13 +2640,13 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
2482 if (xs_is_null(msg)) 2640 if (xs_is_null(msg))
2483 return; 2641 return;
2484 2642
2485 if (!process_input_message(snac, msg, req)) { 2643 if (!process_input_message(user, msg, req)) {
2486 if (retries > queue_retry_max) 2644 if (retries > queue_retry_max)
2487 snac_log(snac, xs_fmt("input giving up")); 2645 snac_log(user, xs_fmt("input giving up"));
2488 else { 2646 else {
2489 /* reenqueue */ 2647 /* reenqueue */
2490 enqueue_input(snac, msg, req, retries + 1); 2648 enqueue_input(user, msg, req, retries + 1);
2491 snac_log(snac, xs_fmt("input requeue #%d", retries + 1)); 2649 snac_log(user, xs_fmt("input requeue #%d", retries + 1));
2492 } 2650 }
2493 } 2651 }
2494 } 2652 }
@@ -2498,7 +2656,7 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
2498 const char *id = xs_dict_get(q_item, "message"); 2656 const char *id = xs_dict_get(q_item, "message");
2499 2657
2500 if (!xs_is_null(id)) 2658 if (!xs_is_null(id))
2501 update_question(snac, id); 2659 update_question(user, id);
2502 } 2660 }
2503 else 2661 else
2504 if (strcmp(type, "object_request") == 0) { 2662 if (strcmp(type, "object_request") == 0) {
@@ -2508,17 +2666,17 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
2508 int status; 2666 int status;
2509 xs *data = NULL; 2667 xs *data = NULL;
2510 2668
2511 status = activitypub_request(snac, id, &data); 2669 status = activitypub_request(user, id, &data);
2512 2670
2513 if (valid_status(status)) 2671 if (valid_status(status))
2514 object_add_ow(id, data); 2672 object_add_ow(id, data);
2515 2673
2516 snac_debug(snac, 1, xs_fmt("object_request %s %d", id, status)); 2674 snac_debug(user, 1, xs_fmt("object_request %s %d", id, status));
2517 } 2675 }
2518 } 2676 }
2519 else 2677 else
2520 if (strcmp(type, "verify_links") == 0) { 2678 if (strcmp(type, "verify_links") == 0) {
2521 verify_links(snac); 2679 verify_links(user);
2522 } 2680 }
2523 else 2681 else
2524 if (strcmp(type, "actor_refresh") == 0) { 2682 if (strcmp(type, "actor_refresh") == 0) {
@@ -2530,16 +2688,16 @@ void process_user_queue_item(snac *snac, xs_dict *q_item)
2530 xs *actor_o = NULL; 2688 xs *actor_o = NULL;
2531 int status; 2689 int status;
2532 2690
2533 if (valid_status((status = activitypub_request(snac, actor, &actor_o)))) 2691 if (valid_status((status = activitypub_request(user, actor, &actor_o))))
2534 actor_add(actor, actor_o); 2692 actor_add(actor, actor_o);
2535 else 2693 else
2536 object_touch(actor); 2694 object_touch(actor);
2537 2695
2538 snac_log(snac, xs_fmt("actor_refresh %s %d", actor, status)); 2696 snac_log(user, xs_fmt("actor_refresh %s %d", actor, status));
2539 } 2697 }
2540 } 2698 }
2541 else 2699 else
2542 snac_log(snac, xs_fmt("unexpected user q_item type '%s'", type)); 2700 snac_log(user, xs_fmt("unexpected user q_item type '%s'", type));
2543} 2701}
2544 2702
2545 2703
@@ -2640,7 +2798,7 @@ void process_queue_item(xs_dict *q_item)
2640 || status == HTTP_STATUS_UNPROCESSABLE_CONTENT 2798 || status == HTTP_STATUS_UNPROCESSABLE_CONTENT
2641 || status < 0) 2799 || status < 0)
2642 /* explicit error: discard */ 2800 /* explicit error: discard */
2643 srv_log(xs_fmt("output message: fatal error %s %d", inbox, status)); 2801 srv_log(xs_fmt("output message: error %s %d", inbox, status));
2644 else 2802 else
2645 if (retries > queue_retry_max) 2803 if (retries > queue_retry_max)
2646 srv_log(xs_fmt("output message: giving up %s %d", inbox, status)); 2804 srv_log(xs_fmt("output message: giving up %s %d", inbox, status));
@@ -2769,11 +2927,12 @@ void process_queue_item(xs_dict *q_item)
2769 snac user; 2927 snac user;
2770 2928
2771 if (user_open(&user, v)) { 2929 if (user_open(&user, v)) {
2772 if (is_msg_for_me(&user, msg)) { 2930 int rsn = is_msg_for_me(&user, msg);
2931 if (rsn) {
2773 xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid); 2932 xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid);
2774 2933
2775 snac_debug(&user, 1, 2934 snac_debug(&user, 1,
2776 xs_fmt("enqueue_input (from shared inbox) %s", xs_dict_get(msg, "id"))); 2935 xs_fmt("enqueue_input (from shared inbox) %s [%d]", xs_dict_get(msg, "id"), rsn));
2777 2936
2778 if (link(tmpfn, fn) < 0) 2937 if (link(tmpfn, fn) < 0)
2779 srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn)); 2938 srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn));