Si vous regardez de plus près la documentation de l'API, vous trouverez la date et l'heure de la dernière connexion LastKnownUserConnectionTimestamp
dans DescribeWorkspacesConnectionStatus!
DescribeWorkspacesConnectionStatus
//Response Syntax
{
"NextToken": "string",
"WorkspacesConnectionStatus": [
{
"ConnectionState": "string",
"ConnectionStateCheckTimestamp": number,
"LastKnownUserConnectionTimestamp": number, //<-cette!
"WorkspaceId": "string"
}
]
}
Cependant, il semble que vous ne puissiez demander que 25 instances maximum après avoir placé WorkspaceId dans cette API et cette requête. S'il y en a 25 ou plus, il semble qu'il ne soit pas possible de tout acquérir en même temps.
WorkspaceIds The identifiers of the WorkSpaces. You can specify up to 25 WorkSpaces.
getInstances.py
import boto3
import csv
# Initialize SDK client for AWS Workspaces
client = boto3.client('workspaces')
paginator_describe = client.get_paginator('describe_workspaces')
paginator_connection = client.get_paginator('describe_workspaces_connection_status')
# Initialize output CSV header
data = [["UserName", "WorkspaceId", "Directory", "State", "ComputerType", "LastLogin"]]
# Initialize Directory ID list
DirectoryIds = [
'd-XXXXXXXX',
'd-YYYYYYYY'
]
# Initialize mapping between Directory ID and Name
def addDirectoryMapping(direcotry):
directoryMapping[direcotry["DirectoryId"]] = direcotry["Alias"]
directoryMapping = {}
paginator = client.get_paginator('describe_workspace_directories')
page_iterator = paginator.paginate()
for page in page_iterator:
map(addDirectoryMapping, page["Directories"])
print("Direcotry mapping is DONE")
# Find Directory Name from ID
# @param workspace ID
# @return Name of directory
def findDirectoryName(direcotyrId):
if direcotyrId in directoryMapping:
return directoryMapping[direcotyrId]
else:
return "Other"
# Find last login time by SingleId
# @param workspace ID
# @return LastLogin Time
def findLatestLoginTimeById(workspaceId):
response = client.describe_workspaces_connection_status(
WorkspaceIds=[
workspaceId
]
)
if not len(response["WorkspacesConnectionStatus"])==0:
connectionStatus = response["WorkspacesConnectionStatus"][0]
if "LastKnownUserConnectionTimestamp" in connectionStatus:
return connectionStatus["LastKnownUserConnectionTimestamp"]
else:
return "unknown"
else:
return "Error"
# Get necessary information from Workspace Instance
# @param workspaces
# @return list
def parse_list(workspaces):
list=[]
list.append(workspaces["UserName"])
list.append(workspaces["WorkspaceId"])
list.append(findDirectoryName(workspaces["DirectoryId"]))
list.append(workspaces["State"])
list.append(workspaces["WorkspaceProperties"]["ComputeTypeName"])
list.append(findLatestLoginTimeById(workspaces["WorkspaceId"]))
return list
# Get necessary information from Workspace Connection Status
# @param WorkspacesConnectionStatus
# @return list
def parse_list_connection(WorkspacesConnectionStatus):
list=[]
list.append(WorkspacesConnectionStatus["WorkspaceId"])
if "LastKnownUserConnectionTimestamp" in WorkspacesConnectionStatus:
list.append(WorkspacesConnectionStatus["LastKnownUserConnectionTimestamp"])
else:
list.append("unknown")
return list
# Export CSV file
def exportCSV(data):
with open("VDI_LIST.csv", "w") as f:
writer = csv.writer(f, lineterminator="\n")
writer.writerows(data)
print("Create File is DONE")
# main function
def describeWorkspacesByDirectoryId (Id):
try:
workspacesDescribeInDirectory = []
print(Id + " is started")
# describe workspaces
page_iterator_describe = paginator_describe.paginate(DirectoryId=Id,PaginationConfig={'PageSize': 10})
for page in page_iterator_describe:
workspacesDescribeInDirectory = map(parse_list, page["Workspaces"])
data.extend(workspacesDescribeInDirectory)
print(Id + " describe_workspaces is DONE")
except Exception as e:
print(e.message + " :: impacted Directory ID is " + Id)
map(describeWorkspacesByDirectoryId, DirectoryIds)
exportCSV(data)
Recommended Posts