diff options
| -rw-r--r-- | activitypub.c | 17 | ||||
| -rw-r--r-- | data.c | 11 | ||||
| -rw-r--r-- | snac.h | 4 |
3 files changed, 21 insertions, 11 deletions
diff --git a/activitypub.c b/activitypub.c index a8a570c..514bf5d 100644 --- a/activitypub.c +++ b/activitypub.c | |||
| @@ -1798,7 +1798,7 @@ void process_user_queue_item(snac *snac, xs_dict *q_item) | |||
| 1798 | if (inbox != NULL) { | 1798 | if (inbox != NULL) { |
| 1799 | /* add to the set and, if it's not there, send message */ | 1799 | /* add to the set and, if it's not there, send message */ |
| 1800 | if (xs_set_add(&inboxes, inbox) == 1) | 1800 | if (xs_set_add(&inboxes, inbox) == 1) |
| 1801 | enqueue_output(snac, msg, inbox, 0); | 1801 | enqueue_output(snac, msg, inbox, 0, 0); |
| 1802 | } | 1802 | } |
| 1803 | else | 1803 | else |
| 1804 | snac_log(snac, xs_fmt("cannot find inbox for %s", actor)); | 1804 | snac_log(snac, xs_fmt("cannot find inbox for %s", actor)); |
| @@ -1812,7 +1812,7 @@ void process_user_queue_item(snac *snac, xs_dict *q_item) | |||
| 1812 | p = shibx; | 1812 | p = shibx; |
| 1813 | while (xs_list_iter(&p, &inbox)) { | 1813 | while (xs_list_iter(&p, &inbox)) { |
| 1814 | if (xs_set_add(&inboxes, inbox) == 1) | 1814 | if (xs_set_add(&inboxes, inbox) == 1) |
| 1815 | enqueue_output(snac, msg, inbox, 0); | 1815 | enqueue_output(snac, msg, inbox, 0, 0); |
| 1816 | } | 1816 | } |
| 1817 | } | 1817 | } |
| 1818 | 1818 | ||
| @@ -1896,6 +1896,7 @@ void process_queue_item(xs_dict *q_item) | |||
| 1896 | xs_str *seckey = xs_dict_get(q_item, "seckey"); | 1896 | xs_str *seckey = xs_dict_get(q_item, "seckey"); |
| 1897 | xs_dict *msg = xs_dict_get(q_item, "message"); | 1897 | xs_dict *msg = xs_dict_get(q_item, "message"); |
| 1898 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); | 1898 | int retries = xs_number_get(xs_dict_get(q_item, "retries")); |
| 1899 | int p_status = xs_number_get(xs_dict_get(q_item, "p_status")); | ||
| 1899 | xs *payload = NULL; | 1900 | xs *payload = NULL; |
| 1900 | int p_size = 0; | 1901 | int p_size = 0; |
| 1901 | 1902 | ||
| @@ -1909,8 +1910,9 @@ void process_queue_item(xs_dict *q_item) | |||
| 1909 | return; | 1910 | return; |
| 1910 | } | 1911 | } |
| 1911 | 1912 | ||
| 1912 | /* deliver */ | 1913 | /* deliver (if previous error status was a timeout, try now longer) */ |
| 1913 | status = send_to_inbox_raw(keyid, seckey, inbox, msg, &payload, &p_size, retries == 0 ? 3 : 8); | 1914 | status = send_to_inbox_raw(keyid, seckey, inbox, msg, |
| 1915 | &payload, &p_size, p_status == 599 ? 20 : 3); | ||
| 1914 | 1916 | ||
| 1915 | if (payload) { | 1917 | if (payload) { |
| 1916 | if (p_size > 64) { | 1918 | if (p_size > 64) { |
| @@ -1934,6 +1936,11 @@ void process_queue_item(xs_dict *q_item) | |||
| 1934 | if (!valid_status(status)) { | 1936 | if (!valid_status(status)) { |
| 1935 | retries++; | 1937 | retries++; |
| 1936 | 1938 | ||
| 1939 | /* if it's not the first time it fails with a timeout, | ||
| 1940 | penalize the server by skipping one retry */ | ||
| 1941 | if (p_status == status && status == 499) | ||
| 1942 | retries++; | ||
| 1943 | |||
| 1937 | /* error sending; requeue? */ | 1944 | /* error sending; requeue? */ |
| 1938 | if (status == 400 || status == 404 || status == 410 || status < 0) | 1945 | if (status == 400 || status == 404 || status == 410 || status < 0) |
| 1939 | /* explicit error: discard */ | 1946 | /* explicit error: discard */ |
| @@ -1943,7 +1950,7 @@ void process_queue_item(xs_dict *q_item) | |||
| 1943 | srv_log(xs_fmt("output message: giving up %s %d", inbox, status)); | 1950 | srv_log(xs_fmt("output message: giving up %s %d", inbox, status)); |
| 1944 | else { | 1951 | else { |
| 1945 | /* requeue */ | 1952 | /* requeue */ |
| 1946 | enqueue_output_raw(keyid, seckey, msg, inbox, retries); | 1953 | enqueue_output_raw(keyid, seckey, msg, inbox, retries, status); |
| 1947 | srv_log(xs_fmt("output message: requeue %s #%d", inbox, retries)); | 1954 | srv_log(xs_fmt("output message: requeue %s #%d", inbox, retries)); |
| 1948 | } | 1955 | } |
| 1949 | } | 1956 | } |
| @@ -2041,13 +2041,16 @@ void enqueue_input(snac *snac, const xs_dict *msg, const xs_dict *req, int retri | |||
| 2041 | 2041 | ||
| 2042 | 2042 | ||
| 2043 | void enqueue_output_raw(const char *keyid, const char *seckey, | 2043 | void enqueue_output_raw(const char *keyid, const char *seckey, |
| 2044 | xs_dict *msg, xs_str *inbox, int retries) | 2044 | xs_dict *msg, xs_str *inbox, int retries, int p_status) |
| 2045 | /* enqueues an output message to an inbox */ | 2045 | /* enqueues an output message to an inbox */ |
| 2046 | { | 2046 | { |
| 2047 | xs *qmsg = _new_qmsg("output", msg, retries); | 2047 | xs *qmsg = _new_qmsg("output", msg, retries); |
| 2048 | char *ntid = xs_dict_get(qmsg, "ntid"); | 2048 | char *ntid = xs_dict_get(qmsg, "ntid"); |
| 2049 | xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid); | 2049 | xs *fn = xs_fmt("%s/queue/%s.json", srv_basedir, ntid); |
| 2050 | 2050 | ||
| 2051 | xs *ns = xs_number_new(p_status); | ||
| 2052 | qmsg = xs_dict_append(qmsg, "p_status", ns); | ||
| 2053 | |||
| 2051 | qmsg = xs_dict_append(qmsg, "inbox", inbox); | 2054 | qmsg = xs_dict_append(qmsg, "inbox", inbox); |
| 2052 | qmsg = xs_dict_append(qmsg, "keyid", keyid); | 2055 | qmsg = xs_dict_append(qmsg, "keyid", keyid); |
| 2053 | qmsg = xs_dict_append(qmsg, "seckey", seckey); | 2056 | qmsg = xs_dict_append(qmsg, "seckey", seckey); |
| @@ -2062,7 +2065,7 @@ void enqueue_output_raw(const char *keyid, const char *seckey, | |||
| 2062 | } | 2065 | } |
| 2063 | 2066 | ||
| 2064 | 2067 | ||
| 2065 | void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries) | 2068 | void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries, int p_status) |
| 2066 | /* enqueues an output message to an inbox */ | 2069 | /* enqueues an output message to an inbox */ |
| 2067 | { | 2070 | { |
| 2068 | if (xs_startswith(inbox, snac->actor)) { | 2071 | if (xs_startswith(inbox, snac->actor)) { |
| @@ -2072,7 +2075,7 @@ void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries) | |||
| 2072 | 2075 | ||
| 2073 | char *seckey = xs_dict_get(snac->key, "secret"); | 2076 | char *seckey = xs_dict_get(snac->key, "secret"); |
| 2074 | 2077 | ||
| 2075 | enqueue_output_raw(snac->actor, seckey, msg, inbox, retries); | 2078 | enqueue_output_raw(snac->actor, seckey, msg, inbox, retries, p_status); |
| 2076 | } | 2079 | } |
| 2077 | 2080 | ||
| 2078 | 2081 | ||
| @@ -2082,7 +2085,7 @@ void enqueue_output_by_actor(snac *snac, xs_dict *msg, const xs_str *actor, int | |||
| 2082 | xs *inbox = get_actor_inbox(snac, actor); | 2085 | xs *inbox = get_actor_inbox(snac, actor); |
| 2083 | 2086 | ||
| 2084 | if (!xs_is_null(inbox)) | 2087 | if (!xs_is_null(inbox)) |
| 2085 | enqueue_output(snac, msg, inbox, retries); | 2088 | enqueue_output(snac, msg, inbox, retries, 0); |
| 2086 | else | 2089 | else |
| 2087 | snac_log(snac, xs_fmt("enqueue_output_by_actor cannot get inbox %s", actor)); | 2090 | snac_log(snac, xs_fmt("enqueue_output_by_actor cannot get inbox %s", actor)); |
| 2088 | } | 2091 | } |
| @@ -178,8 +178,8 @@ int instance_unblock(const char *instance); | |||
| 178 | 178 | ||
| 179 | void enqueue_input(snac *snac, const xs_dict *msg, const xs_dict *req, int retries); | 179 | void enqueue_input(snac *snac, const xs_dict *msg, const xs_dict *req, int retries); |
| 180 | void enqueue_output_raw(const char *keyid, const char *seckey, | 180 | void enqueue_output_raw(const char *keyid, const char *seckey, |
| 181 | xs_dict *msg, xs_str *inbox, int retries); | 181 | xs_dict *msg, xs_str *inbox, int retries, int p_status); |
| 182 | void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries); | 182 | void enqueue_output(snac *snac, xs_dict *msg, xs_str *inbox, int retries, int p_status); |
| 183 | void enqueue_output_by_actor(snac *snac, xs_dict *msg, const xs_str *actor, int retries); | 183 | void enqueue_output_by_actor(snac *snac, xs_dict *msg, const xs_str *actor, int retries); |
| 184 | void enqueue_email(xs_str *msg, int retries); | 184 | void enqueue_email(xs_str *msg, int retries); |
| 185 | void enqueue_telegram(const xs_str *msg, const char *bot, const char *chat_id); | 185 | void enqueue_telegram(const xs_str *msg, const char *bot, const char *chat_id); |