diff options
Diffstat (limited to 'activitypub.c')
| -rw-r--r-- | activitypub.c | 73 |
1 files changed, 47 insertions, 26 deletions
diff --git a/activitypub.c b/activitypub.c index 55a245e..d7f5b37 100644 --- a/activitypub.c +++ b/activitypub.c | |||
| @@ -2052,45 +2052,66 @@ void process_queue_item(xs_dict *q_item) | |||
| 2052 | } | 2052 | } |
| 2053 | else | 2053 | else |
| 2054 | if (strcmp(type, "input") == 0) { | 2054 | if (strcmp(type, "input") == 0) { |
| 2055 | /* redistribute the input message to all users */ | ||
| 2056 | char *ntid = xs_dict_get(q_item, "ntid"); | ||
| 2057 | xs *tmpfn = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid); | ||
| 2058 | xs_dict *msg = xs_dict_get(q_item, "message"); | 2055 | xs_dict *msg = xs_dict_get(q_item, "message"); |
| 2059 | FILE *f; | 2056 | xs_dict *req = xs_dict_get(q_item, "req"); |
| 2057 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | ||
| 2058 | |||
| 2059 | /* do some instance-level checks */ | ||
| 2060 | int r = process_input_message(NULL, msg, req); | ||
| 2060 | 2061 | ||
| 2061 | if ((f = fopen(tmpfn, "w")) != NULL) { | 2062 | if (r == 0) { |
| 2062 | xs_json_dump(q_item, 4, f); | 2063 | /* transient error? retry */ |
| 2063 | fclose(f); | 2064 | int queue_retry_max = xs_number_get(xs_dict_get(srv_config, "queue_retry_max")); |
| 2065 | |||
| 2066 | if (retries > queue_retry_max) | ||
| 2067 | srv_log(xs_fmt("shared input giving up")); | ||
| 2068 | else { | ||
| 2069 | /* reenqueue */ | ||
| 2070 | enqueue_shared_input(msg, req, retries + 1); | ||
| 2071 | srv_log(xs_fmt("shared input requeue #%d", retries + 1)); | ||
| 2072 | } | ||
| 2064 | } | 2073 | } |
| 2074 | else | ||
| 2075 | if (r == 2) { | ||
| 2076 | /* redistribute the input message to all users */ | ||
| 2077 | char *ntid = xs_dict_get(q_item, "ntid"); | ||
| 2078 | xs *tmpfn = xs_fmt("%s/tmp/%s.json", srv_basedir, ntid); | ||
| 2079 | FILE *f; | ||
| 2080 | |||
| 2081 | if ((f = fopen(tmpfn, "w")) != NULL) { | ||
| 2082 | xs_json_dump(q_item, 4, f); | ||
| 2083 | fclose(f); | ||
| 2084 | } | ||
| 2065 | 2085 | ||
| 2066 | xs *users = user_list(); | 2086 | xs *users = user_list(); |
| 2067 | xs_list *p = users; | 2087 | xs_list *p = users; |
| 2068 | char *v; | 2088 | char *v; |
| 2069 | int cnt = 0; | 2089 | int cnt = 0; |
| 2070 | 2090 | ||
| 2071 | while (xs_list_iter(&p, &v)) { | 2091 | while (xs_list_iter(&p, &v)) { |
| 2072 | snac user; | 2092 | snac user; |
| 2073 | 2093 | ||
| 2074 | if (user_open(&user, v)) { | 2094 | if (user_open(&user, v)) { |
| 2075 | if (is_msg_for_me(&user, msg)) { | 2095 | if (is_msg_for_me(&user, msg)) { |
| 2076 | xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid); | 2096 | xs *fn = xs_fmt("%s/queue/%s.json", user.basedir, ntid); |
| 2077 | 2097 | ||
| 2078 | snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn)); | 2098 | snac_debug(&user, 1, xs_fmt("enqueue_input (from shared inbox) %s", fn)); |
| 2079 | 2099 | ||
| 2080 | if (link(tmpfn, fn) < 0) | 2100 | if (link(tmpfn, fn) < 0) |
| 2081 | srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn)); | 2101 | srv_log(xs_fmt("link(%s, %s) error", tmpfn, fn)); |
| 2082 | 2102 | ||
| 2083 | cnt++; | 2103 | cnt++; |
| 2084 | } | 2104 | } |
| 2085 | 2105 | ||
| 2086 | user_free(&user); | 2106 | user_free(&user); |
| 2107 | } | ||
| 2087 | } | 2108 | } |
| 2088 | } | ||
| 2089 | 2109 | ||
| 2090 | unlink(tmpfn); | 2110 | unlink(tmpfn); |
| 2091 | 2111 | ||
| 2092 | if (cnt == 0) | 2112 | if (cnt == 0) |
| 2093 | srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn)); | 2113 | srv_debug(1, xs_fmt("no valid recipients for %s", tmpfn)); |
| 2114 | } | ||
| 2094 | } | 2115 | } |
| 2095 | else | 2116 | else |
| 2096 | srv_log(xs_fmt("unexpected q_item type '%s'", type)); | 2117 | srv_log(xs_fmt("unexpected q_item type '%s'", type)); |