summaryrefslogtreecommitdiff
path: root/data.c
diff options
context:
space:
mode:
authorGravatar default2022-10-11 08:16:58 +0200
committerGravatar default2022-10-11 08:16:58 +0200
commite8fbc94089b70d9fdd7e3b25ad809dfea454770a (patch)
tree43522058393a703b4354a42747fd093f6e3c344f /data.c
parentUpdated TODO. (diff)
downloadsnac2-e8fbc94089b70d9fdd7e3b25ad809dfea454770a.tar.gz
snac2-e8fbc94089b70d9fdd7e3b25ad809dfea454770a.tar.xz
snac2-e8fbc94089b70d9fdd7e3b25ad809dfea454770a.zip
Unify enqueueing code.
Diffstat (limited to 'data.c')
-rw-r--r--data.c74
1 files changed, 37 insertions, 37 deletions
diff --git a/data.c b/data.c
index 893a91d..ac8c59b 100644
--- a/data.c
+++ b/data.c
@@ -892,34 +892,45 @@ d_char *history_list(snac *snac)
892} 892}
893 893
894 894
895void enqueue_input(snac *snac, char *msg, char *req, int retries) 895static int _enqueue_put(char *fn, char *msg)
896/* enqueues an input message */ 896/* writes safely to the queue */
897{ 897{
898 int qrt = xs_number_get(xs_dict_get(srv_config, "queue_retry_minutes")); 898 int ret = 1;
899 xs *ntid = tid(retries * 60 * qrt); 899 xs *tfn = xs_fmt("%s.tmp", fn);
900 xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid);
901 xs *tfn = xs_fmt("%s.tmp", fn);
902 FILE *f; 900 FILE *f;
903 901
904 if ((f = fopen(tfn, "w")) != NULL) { 902 if ((f = fopen(tfn, "w")) != NULL) {
905 xs *qmsg = xs_dict_new(); 903 xs *j = xs_json_dumps_pp(msg, 4);
906 xs *rn = xs_number_new(retries);
907 xs *j;
908
909 qmsg = xs_dict_append(qmsg, "type", "input");
910 qmsg = xs_dict_append(qmsg, "object", msg);
911 qmsg = xs_dict_append(qmsg, "req", req);
912 qmsg = xs_dict_append(qmsg, "retries", rn);
913
914 j = xs_json_dumps_pp(qmsg, 4);
915 904
916 fwrite(j, strlen(j), 1, f); 905 fwrite(j, strlen(j), 1, f);
917 fclose(f); 906 fclose(f);
918 907
919 rename(tfn, fn); 908 rename(tfn, fn);
920
921 snac_debug(snac, 1, xs_fmt("enqueue_input %s", fn));
922 } 909 }
910 else
911 ret = 0;
912
913 return ret;
914}
915
916
917void enqueue_input(snac *snac, char *msg, char *req, int retries)
918/* enqueues an input message */
919{
920 int qrt = xs_number_get(xs_dict_get(srv_config, "queue_retry_minutes"));
921 xs *ntid = tid(retries * 60 * qrt);
922 xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid);
923 xs *qmsg = xs_dict_new();
924 xs *rn = xs_number_new(retries);
925
926 qmsg = xs_dict_append(qmsg, "type", "input");
927 qmsg = xs_dict_append(qmsg, "object", msg);
928 qmsg = xs_dict_append(qmsg, "req", req);
929 qmsg = xs_dict_append(qmsg, "retries", rn);
930
931 _enqueue_put(fn, qmsg);
932
933 snac_debug(snac, 1, xs_fmt("enqueue_input %s", fn));
923} 934}
924 935
925 936
@@ -934,28 +945,17 @@ void enqueue_output(snac *snac, char *msg, char *actor, int retries)
934 int qrt = xs_number_get(xs_dict_get(srv_config, "queue_retry_minutes")); 945 int qrt = xs_number_get(xs_dict_get(srv_config, "queue_retry_minutes"));
935 xs *ntid = tid(retries * 60 * qrt); 946 xs *ntid = tid(retries * 60 * qrt);
936 xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid); 947 xs *fn = xs_fmt("%s/queue/%s.json", snac->basedir, ntid);
937 xs *tfn = xs_fmt("%s.tmp", fn); 948 xs *qmsg = xs_dict_new();
938 FILE *f; 949 xs *rn = xs_number_new(retries);
939 950
940 if ((f = fopen(tfn, "w")) != NULL) { 951 qmsg = xs_dict_append(qmsg, "type", "output");
941 xs *qmsg = xs_dict_new(); 952 qmsg = xs_dict_append(qmsg, "actor", actor);
942 xs *rn = xs_number_new(retries); 953 qmsg = xs_dict_append(qmsg, "object", msg);
943 xs *j; 954 qmsg = xs_dict_append(qmsg, "retries", rn);
944
945 qmsg = xs_dict_append(qmsg, "type", "output");
946 qmsg = xs_dict_append(qmsg, "actor", actor);
947 qmsg = xs_dict_append(qmsg, "object", msg);
948 qmsg = xs_dict_append(qmsg, "retries", rn);
949
950 j = xs_json_dumps_pp(qmsg, 4);
951
952 fwrite(j, strlen(j), 1, f);
953 fclose(f);
954 955
955 rename(tfn, fn); 956 _enqueue_put(fn, qmsg);
956 957
957 snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", actor, fn, retries)); 958 snac_debug(snac, 1, xs_fmt("enqueue_output %s %s %d", actor, fn, retries));
958 }
959} 959}
960 960
961 961