1 /*
2  * Copyright 2018-present Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 ///
17 module aws.lambda_runtime.runtime;
18 
19 import core.time : seconds;
20 import std.datetime.systime : SysTime, Clock;
21 import std.process : environment;
22 import std.conv : to;
23 import std.net.curl;
24 import std.typecons : No;
25 import etc.c.curl : curl_easy_strerror, curl_version;
26 
27 import aws.http.response;
28 import aws.logging.logging;
29 import aws.lambda_runtime.outcome;
30 import aws.lambda_runtime.version_;
31 
32 /// Entry method
33 void runHandler(InvocationResponse function(InvocationRequest) handler)
34 {
35     logInfo(LOG_TAG, "Initializing the D Lambda Runtime version %s", getVersion());
36     string endpoint = "http://";
37     string endpointHost = environment.get("AWS_LAMBDA_RUNTIME_API", "");
38     assert(endpointHost != "", "LAMBDA_SERVER_ADDRESS not defined");
39     logDebug(LOG_TAG, "LAMBDA_SERVER_ADDRESS defined in environment as: %s", endpointHost);
40     endpoint ~= endpointHost;
41 
42     Runtime rt = new Runtime(endpoint);
43     size_t retries = 0;
44     size_t maxRetries = 3;
45 
46     while (retries < maxRetries) {
47         auto nextOutcome = rt.getNext();
48         if (!nextOutcome.isSuccess()) {
49             if (nextOutcome.getFailure() == ResponseCode.REQUEST_NOT_MADE)
50             {
51                 ++retries;
52                 continue;
53             }
54 
55             logInfo(LOG_TAG, "HTTP request was not successful. HTTP response code: %d. Retrying..", nextOutcome.getFailure());
56             ++retries;
57             continue;
58         }
59 
60         retries = 0;
61         auto req = nextOutcome.getResult();
62         logInfo(LOG_TAG, "Invoking user handler");
63         InvocationResponse res = handler(req);
64         logInfo(LOG_TAG, "Invoking user handler completed.");
65 
66         if (res.isSuccess()) {
67             auto postOutcome = rt.postSuccess(req.requestId, res);
68             if (!handlePostOutcome(postOutcome, req.requestId)) 
69             {
70                 return; // TODO: implement a better retry strategy
71             }
72         }
73         else {
74             auto postOutcome = rt.postFailure(req.requestId, res);
75             if (!handlePostOutcome(postOutcome, req.requestId))
76             {
77                 return; // TODO: implement a better retry strategy
78             }
79         }
80     }
81 
82     if (retries == maxRetries)
83     {
84         string libCurlVersion = to!string(curl_version());
85         logError(LOG_TAG, "Exhausted all retries. This is probably a bug in libcurl v" ~ libCurlVersion ~ " Exiting!");
86     }
87 }
88 
89 ///
90 struct InvocationRequest 
91 {
92     /// The user's payload represented as a UTF-8 string.
93     string payload;
94 
95     /// An identifier unique to the current invocation.
96     string requestId;
97 
98     /// X-Ray tracing ID of the current invocation.
99     string xrayTraceId;
100 
101     /// Information about the client application and device when invoked through the AWS Mobile SDK.
102     string clientContext;
103     
104     /// Information about the Amazon Cognito identity provider when invoked through the AWS Mobile SDK.
105     string cognitoIdentity;
106 
107     /// The ARN requested. This can be different in each invoke that executes the same version.
108     string functionArn;
109 
110     /// Function execution deadline counted in milliseconds since the Unix epoch.
111     SysTime deadline;
112 
113     /// The number of milliseconds left before lambda terminates the current execution.
114     long getTimeRemaining() 
115     {
116         return (deadline - Clock.currTime()).total!"msecs";
117     }
118 }
119 
120 ///
121 class InvocationResponse 
122 {
123     /// Create a successful invocation response with the given payload and content-type.
124     static InvocationResponse success(string payload, string contentType)
125     {
126         InvocationResponse r = new InvocationResponse();
127         r._success = true;
128         r._contentType = contentType;
129         r._payload = payload;
130         return r;
131     }
132 
133     /**
134      * Create a failure response with the given error message and error type.
135      * The content-type is always set to application/json in this case.
136      */
137     static InvocationResponse failure(string errorMessage, string errorType)
138     {
139         import std.json;
140         
141         InvocationResponse r = new InvocationResponse();
142         r._success = false;
143         r._contentType = "application/json";
144         JSONValue jsPayload = JSONValue(["errorMessage": JSONValue(errorMessage), "errorType":  JSONValue(errorType), "stackTrace": JSONValue(string[].init)]);
145         r._payload = jsPayload.toString();
146         return r;
147     }
148 
149     /// Get the MIME type of the payload.
150     string getContentType() 
151     { 
152         return _contentType; 
153     }
154 
155     /// Get the payload string. The string is assumed to be UTF-8 encoded.
156     string getPayload() 
157     { 
158         return _payload; 
159     }
160 
161     /// Returns true if the payload and content-type are set. Returns false if the error message and error types are set.
162     bool isSuccess() 
163     { 
164         return _success; 
165     }
166 private:
167     // The output of the function which is sent to the lambda caller.
168     string _payload;
169 
170     // The MIME type of the payload. This is always set to 'application/json' in unsuccessful invocations.
171     string _contentType;
172 
173     // Flag to distinguish if the contents are for successful or unsuccessful invocations.
174     bool _success;
175 }
176 
177 private:
178 
179 enum LOG_TAG = "LAMBDA_RUNTIME";
180 enum REQUEST_ID_HEADER = "lambda-runtime-aws-request-id";
181 enum TRACE_ID_HEADER = "lambda-runtime-trace-id";
182 enum CLIENT_CONTEXT_HEADER = "lambda-runtime-client-context";
183 enum COGNITO_IDENTITY_HEADER = "lambda-runtime-cognito-identity";
184 enum DEADLINE_MS_HEADER = "lambda-runtime-deadline-ms";
185 enum FUNCTION_ARN_HEADER = "lambda-runtime-invoked-function-arn";
186 
187 enum Endpoints
188 {
189     INIT,
190     NEXT,
191     RESULT,
192 }
193 
194 bool isSuccess(ResponseCode httpcode)
195 {
196     return httpcode >= 200 && httpcode <= 299;
197 }
198 
199 void setUserAgentHeader(HTTP http)
200 {
201     static string userAgent = "AWS_Lambda_D/" ~ getVersion();
202     http.setUserAgent(userAgent);
203 }
204 
205 enum CURLE_OK = 0;
206 
207 class Runtime
208 {   
209     alias NextOutcome = Outcome!(InvocationRequest, ResponseCode);
210     alias PostOutcome = Outcome!(NoResult, ResponseCode);
211 
212     private string[] _endpoints;
213     
214     this(string endpoint)
215     {        
216         _endpoints = [
217             endpoint ~ "/2018-06-01/runtime/init/error",
218             endpoint ~ "/2018-06-01/runtime/invocation/next",
219             endpoint ~ "/2018-06-01/runtime/invocation/"
220         ];
221     }
222     
223     // Ask lambda for an invocation.
224     NextOutcome getNext()
225     {
226         Response resp = new Response();
227         
228         auto http = HTTP(_endpoints[Endpoints.NEXT]);
229         http.method = HTTP.Method.get;
230         setUserAgentHeader(http);
231 
232         // lambda freezes the container when no further tasks are available. The freezing period could be longer than the
233         // request timeout, which causes the following get_next request to fail with a timeout error.
234         http.operationTimeout = 0.seconds;
235         http.connectTimeout = 1.seconds;
236         // curl_easy_setopt(m_curl_handle, CURLOPT_NOSIGNAL, 1L);
237         http.tcpNoDelay = true;
238         //curl_easy_setopt(m_curl_handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
239 
240         version(CURL_DEBUG)
241         {
242             http.verbose = true;
243         }
244 
245         http.onReceiveHeader = (in char[] key, in char[] value) { 
246             resp.addHeader(key.idup, value.idup); 
247         };
248         
249         http.onReceive = (ubyte[] data) {
250             resp.appendBody(cast(string)data);
251             return data.length;
252         };
253         
254         logDebug(LOG_TAG, "Making request to %s", _endpoints[Endpoints.NEXT]);
255         
256         auto curlCode = http.perform(No.throwOnError);
257         
258         if (curlCode != CURLE_OK)
259         {
260             string errorText = to!string(curl_easy_strerror(curlCode));
261             
262             logDebug(LOG_TAG, "CURL returned error code %d - %s", curlCode, errorText);
263             logError(LOG_TAG, "Failed to get next invocation. No Response from endpoint");
264             return new NextOutcome(ResponseCode.REQUEST_NOT_MADE);
265         }
266         
267         logDebug(LOG_TAG, "Completed request to %s", _endpoints[Endpoints.NEXT]);
268         resp.setResponseCode(cast(ResponseCode) http.statusLine.code); 
269 
270         if (!isSuccess(resp.getResponseCode()))
271         {
272             logError(LOG_TAG, "Failed to get next invocation. Http Response code: %d", resp.getResponseCode());
273             return new NextOutcome(resp.getResponseCode());
274         }
275 
276         if (!resp.hasHeader(REQUEST_ID_HEADER))
277         {
278             logError(LOG_TAG, "Failed to find header %s in response", REQUEST_ID_HEADER);
279             return new NextOutcome(ResponseCode.REQUEST_NOT_MADE);
280         }
281         
282         InvocationRequest req;
283         req.payload = resp.getBody();
284         req.requestId = resp.getHeader(REQUEST_ID_HEADER);
285 
286         if (resp.hasHeader(TRACE_ID_HEADER))
287         {
288             req.xrayTraceId = resp.getHeader(TRACE_ID_HEADER);
289         }
290 
291         if (resp.hasHeader(CLIENT_CONTEXT_HEADER))
292         {
293             req.clientContext = resp.getHeader(CLIENT_CONTEXT_HEADER);
294         }
295 
296         if (resp.hasHeader(COGNITO_IDENTITY_HEADER))
297         {
298             req.cognitoIdentity = resp.getHeader(COGNITO_IDENTITY_HEADER);
299         }
300 
301         if (resp.hasHeader(FUNCTION_ARN_HEADER))
302         {
303             req.functionArn = resp.getHeader(FUNCTION_ARN_HEADER);
304         }
305 
306         if (resp.hasHeader(DEADLINE_MS_HEADER))
307         {
308             import core.time : msecs;
309             
310             string deadlineString = resp.getHeader(DEADLINE_MS_HEADER);
311             ulong ms = to!ulong(deadlineString[0..10]);
312             assert(ms > 0 && ms < ulong.max);
313             req.deadline = SysTime.fromUnixTime(ms);
314             req.deadline.fracSecs = msecs(deadlineString[10..$].to!long);
315             
316             logInfo(LOG_TAG, "Received payload: %s\nTime remaining: %s", req.payload, req.getTimeRemaining());
317         }
318         return new NextOutcome(req);
319     }
320     
321     /// Tells lambda that the function has succeeded.
322     PostOutcome postSuccess(string requestId, InvocationResponse handlerResponse)
323     {
324         string url = _endpoints[Endpoints.RESULT] ~ requestId ~ "/response";
325         return doPost(url, requestId, handlerResponse);
326     }
327 
328     /// Tells lambda that the function has failed.
329     PostOutcome postFailure(string requestId, InvocationResponse handlerResponse)
330     {
331         string url = _endpoints[Endpoints.RESULT] ~ requestId ~ "/error";
332         return doPost(url, requestId, handlerResponse);
333     }
334     
335     private PostOutcome doPost(string url, string requestId, InvocationResponse handlerResponse)
336     {
337         auto http = HTTP(url);
338         http.method = HTTP.Method.post;
339         setUserAgentHeader(http);
340         http.operationTimeout = 0.seconds;
341         http.connectTimeout = 1.seconds;
342         // curl_easy_setopt(m_curl_handle, CURLOPT_NOSIGNAL, 1L);
343         http.tcpNoDelay = true;
344         //curl_easy_setopt(m_curl_handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
345 
346         version(CURL_DEBUG)
347         {
348             http.verbose = true;
349         }
350         
351         logInfo(LOG_TAG, "Making request to %s", url);
352         string contentType = (handlerResponse.getContentType())
353             ? handlerResponse.getContentType : "text/html"; 
354         http.addRequestHeader("expect", "");
355         http.addRequestHeader("transfer-encoding", "");
356         string payload = handlerResponse.getPayload();
357         http.setPostData(payload, contentType);
358         http.contentLength = payload.length;
359        
360         logDebug(LOG_TAG, "calculating content length... %s", ("content-length: " ~ to!string(payload.length)));
361         
362         Response resp = new Response();
363         auto curlCode = http.perform(No.throwOnError);
364 
365         if (curlCode != CURLE_OK) {
366             string errorText = to!string(curl_easy_strerror(curlCode));
367             
368             logDebug(LOG_TAG, "CURL returned error code %d - %s, for invocation %s", curlCode,
369                 errorText,
370                 requestId);
371             return new PostOutcome(ResponseCode.REQUEST_NOT_MADE);
372         }
373 
374         if (!isSuccess(cast(ResponseCode) http.statusLine.code)) {
375             logError(LOG_TAG, "Failed to post handler success response. Http response code: %ld.", http.statusLine.code);
376             return new PostOutcome(cast (ResponseCode) http.statusLine.code);
377         }
378         return new PostOutcome(NoResult());
379     }    
380 }
381 
382 bool handlePostOutcome(Runtime.PostOutcome o, string requestId)
383 {
384     if (o.isSuccess())
385     {
386         return true;
387     }
388 
389     if (o.getFailure() == ResponseCode.REQUEST_NOT_MADE)
390     {
391         logError(LOG_TAG, "Failed to send HTTP request for invocation %s.", requestId);
392         return false;
393     }
394 
395     logInfo(LOG_TAG, "HTTP Request for invocation %s was not successful. HTTP response code: %d.", requestId, o.getFailure());
396     return false;
397 }
398 
399 struct NoResult {}
400