summaryrefslogtreecommitdiff
path: root/activitypub.c
diff options
context:
space:
mode:
Diffstat (limited to 'activitypub.c')
-rw-r--r--activitypub.c73
1 files changed, 47 insertions, 26 deletions
diff --git a/activitypub.c b/activitypub.c
index 55a245e..d7f5b37 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -2052,45 +2052,66 @@ void process_queue_item(xs_dict *q_item)
2052 } 2052 }
2053 else 2053 else
2054 if (strcmp(type, "input") == 0) { 2054 if (strcmp(type, "input") == 0) {
2055 /* redistribute the input message to all users */
2056 char *ntid = xs_dict_get(q_item, "ntid");
2057 xs *tmpfn = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid);
2058 xs_dict *msg = xs_dict_get(q_item, "message"); 2055 xs_dict *msg = xs_dict_get(q_item, "message");
2059 FILE *f; 2056 xs_dict *req = xs_dict_get(q_item, "req");
2057 int retries = xs_number_get(xs_dict_get(q_item, "retries"));
2058
2059 /* do some instance-level checks */
2060 int r = process_input_message(NULL, msg, req);
2060 2061
2061 if ((f = fopen(tmpfn, "w")) != NULL) { 2062 if (r == 0) {
2062 xs_json_dump(q_item, 4, f); 2063 /* transient error? retry */
2063 fclose(f); 2064 int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max"));
2065
2066 if (retries > queue_retry_max)
2067 srv_log(xs_fmt("shared input giving up"));
2068 else {
2069 /* reenqueue */
2070 enqueue_shared_input(msg, req, retries + 1);
2071 srv_log(xs_fmt("shared input requeue #%d", retries + 1));
2072 }
2064 } 2073 }
2074 else
2075 if (r == 2) {
2076 /* redistribute the input message to all users */
2077 char *ntid = xs_dict_get(q_item, "ntid");
2078 xs *tmpfn = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid);
2079 FILE *f;
2080
2081 if ((f = fopen(tmpfn, "w")) != NULL) {
2082 xs_json_dump(q_item, 4, f);
2083 fclose(f);
2084 }
2065 2085
2066 xs *users = user_list(); 2086 xs *users = user_list();
2067 xs_list *p = users; 2087 xs_list *p = users;
2068 char *v; 2088 char *v;
2069 int cnt = 0; 2089 int cnt = 0;
2070 2090
2071 while (xs_list_iter(&p, &v)) { 2091 while (xs_list_iter(&p, &v)) {
2072 snac user; 2092 snac user;
2073 2093
2074 if (user_open(&user, v)) { 2094 if (user_open(&user, v)) {
2075 if (is_msg_for_me(&user, msg)) { 2095 if (is_msg_for_me(&user, msg)) {
2076 xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid); 2096 xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid);
2077 2097
2078 snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn)); 2098 snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn));
2079 2099
2080 if (link(tmpfn, fn) < 0) 2100 if (link(tmpfn, fn) < 0)
2081 srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn)); 2101 srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn));
2082 2102
2083 cnt++; 2103 cnt++;
2084 } 2104 }
2085 2105
2086 user_free(&user); 2106 user_free(&user);
2107 }
2087 } 2108 }
2088 }
2089 2109
2090 unlink(tmpfn); 2110 unlink(tmpfn);
2091 2111
2092 if (cnt == 0) 2112 if (cnt == 0)
2093 srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn)); 2113 srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn));
2114 }
2094 } 2115 }
2095 else 2116 else
2096 srv_log(xs_fmt("unexpected q_item type '%s'", type)); 2117 srv_log(xs_fmt("unexpected q_item type '%s'", type));