summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar default2022-09-28 20:41:07 +0200
committerGravatar default2022-09-28 20:41:07 +0200
commit2be2c07e9c930fd4582feb3cb02162c8b3785000 (patch)
treeec4f141364de69fb790371e6b8ed2431782e9c98
parentIf an actor cannot be retrieved, move the message back to the queue. (diff)
downloadsnac2-2be2c07e9c930fd4582feb3cb02162c8b3785000.tar.gz
snac2-2be2c07e9c930fd4582feb3cb02162c8b3785000.tar.xz
snac2-2be2c07e9c930fd4582feb3cb02162c8b3785000.zip
The input queue also has retries.
-rw-r--r--activitypub.c21
-rw-r--r--data.c13
-rw-r--r--snac.h2
3 files changed, 24 insertions, 12 deletions
diff --git a/activitypub.c b/activitypub.c
index 20c734b..8127e8d 100644
--- a/activitypub.c
+++ b/activitypub.c
@@ -557,7 +557,7 @@ d_char *msg_note(snac *snac, char *content, char *rcpts, char *in_reply_to)
557 557
558/** queues **/ 558/** queues **/
559 559
560void process_message(snac *snac, char *msg, char *req) 560int process_message(snac *snac, char *msg, char *req)
561/* processes an ActivityPub message from the input queue */ 561/* processes an ActivityPub message from the input queue */
562{ 562{
563 /* actor and type exist, were checked previously */ 563 /* actor and type exist, were checked previously */
@@ -575,10 +575,8 @@ void process_message(snac *snac, char *msg, char *req)
575 575
576 /* bring the actor */ 576 /* bring the actor */
577 if (!valid_status(actor_request(snac, actor, &actor_o))) { 577 if (!valid_status(actor_request(snac, actor, &actor_o))) {
578 /* error: re-enqueue to try later */
579 enqueue_input(snac, msg, req);
580 snac_log(snac, xs_fmt("error requesting actor %s -- retry later", actor)); 578 snac_log(snac, xs_fmt("error requesting actor %s -- retry later", actor));
581 return; 579 return 0;
582 } 580 }
583 581
584 /* check the signature */ 582 /* check the signature */
@@ -690,6 +688,8 @@ void process_message(snac *snac, char *msg, char *req)
690*/ 688*/
691 else 689 else
692 snac_debug(snac, 1, xs_fmt("process_message type '%s' ignored", type)); 690 snac_debug(snac, 1, xs_fmt("process_message type '%s' ignored", type));
691
692 return 1;
693} 693}
694 694
695 695
@@ -742,8 +742,17 @@ void process_queue(snac *snac)
742 /* process the message */ 742 /* process the message */
743 char *msg = xs_dict_get(q_item, "object"); 743 char *msg = xs_dict_get(q_item, "object");
744 char *req = xs_dict_get(q_item, "req"); 744 char *req = xs_dict_get(q_item, "req");
745 int retries = xs_number_get(xs_dict_get(q_item, "retries"));
745 746
746 process_message(snac, msg, req); 747 if (!process_message(snac, msg, req)) {
748 if (retries > queue_retry_max)
749 snac_log(snac, xs_fmt("process_queue input giving up"));
750 else {
751 /* reenqueue */
752 enqueue_input(snac, msg, req, retries + 1);
753 snac_log(snac, xs_fmt("process_queue input requeue %d", retries + 1));
754 }
755 }
747 } 756 }
748 } 757 }
749} 758}
@@ -907,7 +916,7 @@ int activitypub_post_handler(d_char *req, char *q_path,
907 } 916 }
908 917
909 if (valid_status(status)) { 918 if (valid_status(status)) {
910 enqueue_input(&snac, msg, req); 919 enqueue_input(&snac, msg, req, 0);
911 *ctype = "application/activity+json"; 920 *ctype = "application/activity+json";
912 } 921 }
913 922
diff --git a/data.c b/data.c
index c4ade59..ee21a98 100644
--- a/data.c
+++ b/data.c
@@ -831,21 +831,24 @@ int static_get(snac *snac, char *id, d_char **data, int *size)
831} 831}
832 832
833 833
834void enqueue_input(snac *snac, char *msg, char *req) 834void enqueue_input(snac *snac, char *msg, char *req, int retries)
835/* enqueues an input message */ 835/* enqueues an input message */
836{ 836{
837 xs *ntid = tid(0); 837 int qrt = xs_number_get(xs_dict_get(srv_config, "queue_retry_minutes"));
838 xs *ntid = tid(retries * 60 * qrt);
838 xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); 839 xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid);
839 xs *tfn = xs_fmt("%s.tmp", fn); 840 xs *tfn = xs_fmt("%s.tmp", fn);
840 FILE *f; 841 FILE *f;
841 842
842 if ((f = fopen(tfn, "w")) != NULL) { 843 if ((f = fopen(tfn, "w")) != NULL) {
843 xs *qmsg = xs_dict_new(); 844 xs *qmsg = xs_dict_new();
845 xs *rn = xs_number_new(retries);
844 xs *j; 846 xs *j;
845 847
846 qmsg = xs_dict_append(qmsg, "type", "input"); 848 qmsg = xs_dict_append(qmsg, "type", "input");
847 qmsg = xs_dict_append(qmsg, "object", msg); 849 qmsg = xs_dict_append(qmsg, "object", msg);
848 qmsg = xs_dict_append(qmsg, "req", req); 850 qmsg = xs_dict_append(qmsg, "req", req);
851 qmsg = xs_dict_append(qmsg, "retries", rn);
849 852
850 j = xs_json_dumps_pp(qmsg, 4); 853 j = xs_json_dumps_pp(qmsg, 4);
851 854
diff --git a/snac.h b/snac.h
index 60322d4..c65b222 100644
--- a/snac.h
+++ b/snac.h
@@ -83,7 +83,7 @@ int actor_get(snac *snac, char *actor, d_char **data);
83 83
84int static_get(snac *snac, char *id, d_char **data, int *size); 84int static_get(snac *snac, char *id, d_char **data, int *size);
85 85
86void enqueue_input(snac *snac, char *msg, char *req); 86void enqueue_input(snac *snac, char *msg, char *req, int retries);
87void enqueue_output(snac *snac, char *msg, char *actor, int retries); 87void enqueue_output(snac *snac, char *msg, char *actor, int retries);
88 88
89d_char *queue(snac *snac); 89d_char *queue(snac *snac);