1 /* 2 * Copyright 2018-present Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 /// 17 module aws.lambda_runtime.runtime; 18 19 import core.time : seconds; 20 import std.datetime.systime : SysTime, Clock; 21 import std.process : environment; 22 import std.conv : to; 23 import std.net.curl; 24 import std.typecons : No; 25 import etc.c.curl : curl_easy_strerror, curl_version; 26 27 import aws.http.response; 28 import aws.logging.logging; 29 import aws.lambda_runtime.outcome; 30 import aws.lambda_runtime.version_; 31 32 /// Entry method 33 void runHandler(InvocationResponse function(InvocationRequest) handler) 34 { 35 logInfo(LOG_TAG, "Initializing the D Lambda Runtime version %s", getVersion()); 36 string endpoint = "http://"; 37 string endpointHost = environment.get("AWS_LAMBDA_RUNTIME_API", ""); 38 assert(endpointHost != "", "LAMBDA_SERVER_ADDRESS not defined"); 39 logDebug(LOG_TAG, "LAMBDA_SERVER_ADDRESS defined in environment as: %s", endpointHost); 40 endpoint ~= endpointHost; 41 42 Runtime rt = new Runtime(endpoint); 43 size_t retries = 0; 44 size_t maxRetries = 3; 45 46 while (retries < maxRetries) { 47 auto nextOutcome = rt.getNext(); 48 if (!nextOutcome.isSuccess()) { 49 if (nextOutcome.getFailure() == ResponseCode.REQUEST_NOT_MADE) 50 { 51 ++retries; 52 continue; 53 } 54 55 logInfo(LOG_TAG, "HTTP request was not successful. HTTP response code: %d. Retrying..", nextOutcome.getFailure()); 56 ++retries; 57 continue; 58 } 59 60 retries = 0; 61 auto req = nextOutcome.getResult(); 62 logInfo(LOG_TAG, "Invoking user handler"); 63 InvocationResponse res = handler(req); 64 logInfo(LOG_TAG, "Invoking user handler completed."); 65 66 if (res.isSuccess()) { 67 auto postOutcome = rt.postSuccess(req.requestId, res); 68 if (!handlePostOutcome(postOutcome, req.requestId)) 69 { 70 return; // TODO: implement a better retry strategy 71 } 72 } 73 else { 74 auto postOutcome = rt.postFailure(req.requestId, res); 75 if (!handlePostOutcome(postOutcome, req.requestId)) 76 { 77 return; // TODO: implement a better retry strategy 78 } 79 } 80 } 81 82 if (retries == maxRetries) 83 { 84 string libCurlVersion = to!string(curl_version()); 85 logError(LOG_TAG, "Exhausted all retries. This is probably a bug in libcurl v" ~ libCurlVersion ~ " Exiting!"); 86 } 87 } 88 89 /// 90 struct InvocationRequest 91 { 92 /// The user's payload represented as a UTF-8 string. 93 string payload; 94 95 /// An identifier unique to the current invocation. 96 string requestId; 97 98 /// X-Ray tracing ID of the current invocation. 99 string xrayTraceId; 100 101 /// Information about the client application and device when invoked through the AWS Mobile SDK. 102 string clientContext; 103 104 /// Information about the Amazon Cognito identity provider when invoked through the AWS Mobile SDK. 105 string cognitoIdentity; 106 107 /// The ARN requested. This can be different in each invoke that executes the same version. 108 string functionArn; 109 110 /// Function execution deadline counted in milliseconds since the Unix epoch. 111 SysTime deadline; 112 113 /// The number of milliseconds left before lambda terminates the current execution. 114 long getTimeRemaining() 115 { 116 return (deadline - Clock.currTime()).total!"msecs"; 117 } 118 } 119 120 /// 121 class InvocationResponse 122 { 123 /// Create a successful invocation response with the given payload and content-type. 124 static InvocationResponse success(string payload, string contentType) 125 { 126 InvocationResponse r = new InvocationResponse(); 127 r._success = true; 128 r._contentType = contentType; 129 r._payload = payload; 130 return r; 131 } 132 133 /** 134 * Create a failure response with the given error message and error type. 135 * The content-type is always set to application/json in this case. 136 */ 137 static InvocationResponse failure(string errorMessage, string errorType) 138 { 139 import std.json; 140 141 InvocationResponse r = new InvocationResponse(); 142 r._success = false; 143 r._contentType = "application/json"; 144 JSONValue jsPayload = JSONValue(["errorMessage": JSONValue(errorMessage), "errorType": JSONValue(errorType), "stackTrace": JSONValue(string[].init)]); 145 r._payload = jsPayload.toString(); 146 return r; 147 } 148 149 /// Get the MIME type of the payload. 150 string getContentType() 151 { 152 return _contentType; 153 } 154 155 /// Get the payload string. The string is assumed to be UTF-8 encoded. 156 string getPayload() 157 { 158 return _payload; 159 } 160 161 /// Returns true if the payload and content-type are set. Returns false if the error message and error types are set. 162 bool isSuccess() 163 { 164 return _success; 165 } 166 private: 167 // The output of the function which is sent to the lambda caller. 168 string _payload; 169 170 // The MIME type of the payload. This is always set to 'application/json' in unsuccessful invocations. 171 string _contentType; 172 173 // Flag to distinguish if the contents are for successful or unsuccessful invocations. 174 bool _success; 175 } 176 177 private: 178 179 enum LOG_TAG = "LAMBDA_RUNTIME"; 180 enum REQUEST_ID_HEADER = "lambda-runtime-aws-request-id"; 181 enum TRACE_ID_HEADER = "lambda-runtime-trace-id"; 182 enum CLIENT_CONTEXT_HEADER = "lambda-runtime-client-context"; 183 enum COGNITO_IDENTITY_HEADER = "lambda-runtime-cognito-identity"; 184 enum DEADLINE_MS_HEADER = "lambda-runtime-deadline-ms"; 185 enum FUNCTION_ARN_HEADER = "lambda-runtime-invoked-function-arn"; 186 187 enum Endpoints 188 { 189 INIT, 190 NEXT, 191 RESULT, 192 } 193 194 bool isSuccess(ResponseCode httpcode) 195 { 196 return httpcode >= 200 && httpcode <= 299; 197 } 198 199 void setUserAgentHeader(HTTP http) 200 { 201 static string userAgent = "AWS_Lambda_D/" ~ getVersion(); 202 http.setUserAgent(userAgent); 203 } 204 205 enum CURLE_OK = 0; 206 207 class Runtime 208 { 209 alias NextOutcome = Outcome!(InvocationRequest, ResponseCode); 210 alias PostOutcome = Outcome!(NoResult, ResponseCode); 211 212 private string[] _endpoints; 213 214 this(string endpoint) 215 { 216 _endpoints = [ 217 endpoint ~ "/2018-06-01/runtime/init/error", 218 endpoint ~ "/2018-06-01/runtime/invocation/next", 219 endpoint ~ "/2018-06-01/runtime/invocation/" 220 ]; 221 } 222 223 // Ask lambda for an invocation. 224 NextOutcome getNext() 225 { 226 Response resp = new Response(); 227 228 auto http = HTTP(_endpoints[Endpoints.NEXT]); 229 http.method = HTTP.Method.get; 230 setUserAgentHeader(http); 231 232 // lambda freezes the container when no further tasks are available. The freezing period could be longer than the 233 // request timeout, which causes the following get_next request to fail with a timeout error. 234 http.operationTimeout = 0.seconds; 235 http.connectTimeout = 1.seconds; 236 // curl_easy_setopt(m_curl_handle, CURLOPT_NOSIGNAL, 1L); 237 http.tcpNoDelay = true; 238 //curl_easy_setopt(m_curl_handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); 239 240 version(CURL_DEBUG) 241 { 242 http.verbose = true; 243 } 244 245 http.onReceiveHeader = (in char[] key, in char[] value) { 246 resp.addHeader(key.idup, value.idup); 247 }; 248 249 http.onReceive = (ubyte[] data) { 250 resp.appendBody(cast(string)data); 251 return data.length; 252 }; 253 254 logDebug(LOG_TAG, "Making request to %s", _endpoints[Endpoints.NEXT]); 255 256 auto curlCode = http.perform(No.throwOnError); 257 258 if (curlCode != CURLE_OK) 259 { 260 string errorText = to!string(curl_easy_strerror(curlCode)); 261 262 logDebug(LOG_TAG, "CURL returned error code %d - %s", curlCode, errorText); 263 logError(LOG_TAG, "Failed to get next invocation. No Response from endpoint"); 264 return new NextOutcome(ResponseCode.REQUEST_NOT_MADE); 265 } 266 267 logDebug(LOG_TAG, "Completed request to %s", _endpoints[Endpoints.NEXT]); 268 resp.setResponseCode(cast(ResponseCode) http.statusLine.code); 269 270 if (!isSuccess(resp.getResponseCode())) 271 { 272 logError(LOG_TAG, "Failed to get next invocation. Http Response code: %d", resp.getResponseCode()); 273 return new NextOutcome(resp.getResponseCode()); 274 } 275 276 if (!resp.hasHeader(REQUEST_ID_HEADER)) 277 { 278 logError(LOG_TAG, "Failed to find header %s in response", REQUEST_ID_HEADER); 279 return new NextOutcome(ResponseCode.REQUEST_NOT_MADE); 280 } 281 282 InvocationRequest req; 283 req.payload = resp.getBody(); 284 req.requestId = resp.getHeader(REQUEST_ID_HEADER); 285 286 if (resp.hasHeader(TRACE_ID_HEADER)) 287 { 288 req.xrayTraceId = resp.getHeader(TRACE_ID_HEADER); 289 } 290 291 if (resp.hasHeader(CLIENT_CONTEXT_HEADER)) 292 { 293 req.clientContext = resp.getHeader(CLIENT_CONTEXT_HEADER); 294 } 295 296 if (resp.hasHeader(COGNITO_IDENTITY_HEADER)) 297 { 298 req.cognitoIdentity = resp.getHeader(COGNITO_IDENTITY_HEADER); 299 } 300 301 if (resp.hasHeader(FUNCTION_ARN_HEADER)) 302 { 303 req.functionArn = resp.getHeader(FUNCTION_ARN_HEADER); 304 } 305 306 if (resp.hasHeader(DEADLINE_MS_HEADER)) 307 { 308 import core.time : msecs; 309 310 string deadlineString = resp.getHeader(DEADLINE_MS_HEADER); 311 ulong ms = to!ulong(deadlineString[0..10]); 312 assert(ms > 0 && ms < ulong.max); 313 req.deadline = SysTime.fromUnixTime(ms); 314 req.deadline.fracSecs = msecs(deadlineString[10..$].to!long); 315 316 logInfo(LOG_TAG, "Received payload: %s\nTime remaining: %s", req.payload, req.getTimeRemaining()); 317 } 318 return new NextOutcome(req); 319 } 320 321 /// Tells lambda that the function has succeeded. 322 PostOutcome postSuccess(string requestId, InvocationResponse handlerResponse) 323 { 324 string url = _endpoints[Endpoints.RESULT] ~ requestId ~ "/response"; 325 return doPost(url, requestId, handlerResponse); 326 } 327 328 /// Tells lambda that the function has failed. 329 PostOutcome postFailure(string requestId, InvocationResponse handlerResponse) 330 { 331 string url = _endpoints[Endpoints.RESULT] ~ requestId ~ "/error"; 332 return doPost(url, requestId, handlerResponse); 333 } 334 335 private PostOutcome doPost(string url, string requestId, InvocationResponse handlerResponse) 336 { 337 auto http = HTTP(url); 338 http.method = HTTP.Method.post; 339 setUserAgentHeader(http); 340 http.operationTimeout = 0.seconds; 341 http.connectTimeout = 1.seconds; 342 // curl_easy_setopt(m_curl_handle, CURLOPT_NOSIGNAL, 1L); 343 http.tcpNoDelay = true; 344 //curl_easy_setopt(m_curl_handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); 345 346 version(CURL_DEBUG) 347 { 348 http.verbose = true; 349 } 350 351 logInfo(LOG_TAG, "Making request to %s", url); 352 string contentType = (handlerResponse.getContentType()) 353 ? handlerResponse.getContentType : "text/html"; 354 http.addRequestHeader("expect", ""); 355 http.addRequestHeader("transfer-encoding", ""); 356 string payload = handlerResponse.getPayload(); 357 http.setPostData(payload, contentType); 358 http.contentLength = payload.length; 359 360 logDebug(LOG_TAG, "calculating content length... %s", ("content-length: " ~ to!string(payload.length))); 361 362 Response resp = new Response(); 363 auto curlCode = http.perform(No.throwOnError); 364 365 if (curlCode != CURLE_OK) { 366 string errorText = to!string(curl_easy_strerror(curlCode)); 367 368 logDebug(LOG_TAG, "CURL returned error code %d - %s, for invocation %s", curlCode, 369 errorText, 370 requestId); 371 return new PostOutcome(ResponseCode.REQUEST_NOT_MADE); 372 } 373 374 if (!isSuccess(cast(ResponseCode) http.statusLine.code)) { 375 logError(LOG_TAG, "Failed to post handler success response. Http response code: %ld.", http.statusLine.code); 376 return new PostOutcome(cast (ResponseCode) http.statusLine.code); 377 } 378 return new PostOutcome(NoResult()); 379 } 380 } 381 382 bool handlePostOutcome(Runtime.PostOutcome o, string requestId) 383 { 384 if (o.isSuccess()) 385 { 386 return true; 387 } 388 389 if (o.getFailure() == ResponseCode.REQUEST_NOT_MADE) 390 { 391 logError(LOG_TAG, "Failed to send HTTP request for invocation %s.", requestId); 392 return false; 393 } 394 395 logInfo(LOG_TAG, "HTTP Request for invocation %s was not successful. HTTP response code: %d.", requestId, o.getFailure()); 396 return false; 397 } 398 399 struct NoResult {} 400